mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 21:35:52 +00:00
2134 lines
57 KiB
Go
2134 lines
57 KiB
Go
package consul
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/armon/go-radix"
|
|
"github.com/armon/gomdb"
|
|
"github.com/hashicorp/consul/consul/structs"
|
|
)
|
|
|
|
const (
|
|
dbNodes = "nodes"
|
|
dbServices = "services"
|
|
dbChecks = "checks"
|
|
dbKVS = "kvs"
|
|
dbTombstone = "tombstones"
|
|
dbSessions = "sessions"
|
|
dbSessionChecks = "sessionChecks"
|
|
dbACLs = "acls"
|
|
dbMaxMapSize32bit uint64 = 128 * 1024 * 1024 // 128MB maximum size
|
|
dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size
|
|
dbMaxReaders uint = 4096 // 4K, default is 126
|
|
)
|
|
|
|
// kvMode is used internally to control which type of set
|
|
// operation we are performing
|
|
type kvMode int
|
|
|
|
const (
|
|
kvSet kvMode = iota
|
|
kvCAS
|
|
kvLock
|
|
kvUnlock
|
|
)
|
|
|
|
// The StateStore is responsible for maintaining all the Consul
|
|
// state. It is manipulated by the FSM which maintains consistency
|
|
// through the use of Raft. The goals of the StateStore are to provide
|
|
// high concurrency for read operations without blocking writes, and
|
|
// to provide write availability in the face of reads. The current
|
|
// implementation uses the Lightning Memory-Mapped Database (MDB).
|
|
// This gives us Multi-Version Concurrency Control for "free"
|
|
type StateStore struct {
|
|
logger *log.Logger
|
|
path string
|
|
env *mdb.Env
|
|
nodeTable *MDBTable
|
|
serviceTable *MDBTable
|
|
checkTable *MDBTable
|
|
kvsTable *MDBTable
|
|
tombstoneTable *MDBTable
|
|
sessionTable *MDBTable
|
|
sessionCheckTable *MDBTable
|
|
aclTable *MDBTable
|
|
tables MDBTables
|
|
watch map[*MDBTable]*NotifyGroup
|
|
queryTables map[string]MDBTables
|
|
|
|
// kvWatch is a more optimized way of watching for KV changes.
|
|
// Instead of just using a NotifyGroup for the entire table,
|
|
// a watcher is instantiated on a given prefix. When a change happens,
|
|
// only the relevant watchers are woken up. This reduces the cost of
|
|
// watching for KV changes.
|
|
kvWatch *radix.Tree
|
|
kvWatchLock sync.Mutex
|
|
|
|
// lockDelay is used to mark certain locks as unacquirable.
|
|
// When a lock is forcefully released (failing health
|
|
// check, destroyed session, etc), it is subject to the LockDelay
|
|
// impossed by the session. This prevents another session from
|
|
// acquiring the lock for some period of time as a protection against
|
|
// split-brains. This is inspired by the lock-delay in Chubby.
|
|
// Because this relies on wall-time, we cannot assume all peers
|
|
// perceive time as flowing uniformly. This means KVSLock MUST ignore
|
|
// lockDelay, since the lockDelay may have expired on the leader,
|
|
// but not on the follower. Rejecting the lock could result in
|
|
// inconsistencies in the FSMs due to the rate time progresses. Instead,
|
|
// only the opinion of the leader is respected, and the Raft log
|
|
// is never questioned.
|
|
lockDelay map[string]time.Time
|
|
lockDelayLock sync.RWMutex
|
|
|
|
// GC is when we create tombstones to track their time-to-live.
|
|
// The GC is consumed upstream to manage clearing of tombstones.
|
|
gc *TombstoneGC
|
|
}
|
|
|
|
// StateSnapshot is used to provide a point-in-time snapshot
|
|
// It works by starting a readonly transaction against all tables.
|
|
type StateSnapshot struct {
|
|
store *StateStore
|
|
tx *MDBTxn
|
|
lastIndex uint64
|
|
}
|
|
|
|
// sessionCheck is used to create a many-to-one table such
|
|
// that each check registered by a session can be mapped back
|
|
// to the session row.
|
|
type sessionCheck struct {
|
|
Node string
|
|
CheckID string
|
|
Session string
|
|
}
|
|
|
|
// Close is used to abort the transaction and allow for cleanup
|
|
func (s *StateSnapshot) Close() error {
|
|
s.tx.Abort()
|
|
return nil
|
|
}
|
|
|
|
// NewStateStore is used to create a new state store
|
|
func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) {
|
|
// Create a new temp dir
|
|
path, err := ioutil.TempDir("", "consul")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewStateStorePath(gc, path, logOutput)
|
|
}
|
|
|
|
// NewStateStorePath is used to create a new state store at a given path
|
|
// The path is cleared on closing.
|
|
func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) {
|
|
// Open the env
|
|
env, err := mdb.NewEnv()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s := &StateStore{
|
|
logger: log.New(logOutput, "", log.LstdFlags),
|
|
path: path,
|
|
env: env,
|
|
watch: make(map[*MDBTable]*NotifyGroup),
|
|
kvWatch: radix.New(),
|
|
lockDelay: make(map[string]time.Time),
|
|
gc: gc,
|
|
}
|
|
|
|
// Ensure we can initialize
|
|
if err := s.initialize(); err != nil {
|
|
env.Close()
|
|
os.RemoveAll(path)
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Close is used to safely shutdown the state store
|
|
func (s *StateStore) Close() error {
|
|
s.env.Close()
|
|
os.RemoveAll(s.path)
|
|
return nil
|
|
}
|
|
|
|
// initialize is used to setup the store for use
|
|
func (s *StateStore) initialize() error {
|
|
// Setup the Env first
|
|
if err := s.env.SetMaxDBs(mdb.DBI(32)); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set the maximum db size based on 32/64bit. Since we are
|
|
// doing an mmap underneath, we need to limit our use of virtual
|
|
// address space on 32bit, but don't have to care on 64bit.
|
|
dbSize := dbMaxMapSize32bit
|
|
if runtime.GOARCH == "amd64" {
|
|
dbSize = dbMaxMapSize64bit
|
|
}
|
|
|
|
// Increase the maximum map size
|
|
if err := s.env.SetMapSize(dbSize); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Increase the maximum number of concurrent readers
|
|
// TODO: Block transactions if we could exceed dbMaxReaders
|
|
if err := s.env.SetMaxReaders(dbMaxReaders); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Optimize our flags for speed over safety, since the Raft log + snapshots
|
|
// are durable. We treat this as an ephemeral in-memory DB, since we nuke
|
|
// the data anyways.
|
|
var flags uint = mdb.NOMETASYNC | mdb.NOSYNC | mdb.NOTLS
|
|
if err := s.env.Open(s.path, flags, 0755); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Tables use a generic struct encoder
|
|
encoder := func(obj interface{}) []byte {
|
|
buf, err := structs.Encode(255, obj)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return buf[1:]
|
|
}
|
|
|
|
// Setup our tables
|
|
s.nodeTable = &MDBTable{
|
|
Name: dbNodes,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"Node"},
|
|
CaseInsensitive: true,
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(structs.Node)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
s.serviceTable = &MDBTable{
|
|
Name: dbServices,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"Node", "ServiceID"},
|
|
},
|
|
"service": &MDBIndex{
|
|
AllowBlank: true,
|
|
Fields: []string{"ServiceName"},
|
|
CaseInsensitive: true,
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(structs.ServiceNode)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
s.checkTable = &MDBTable{
|
|
Name: dbChecks,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"Node", "CheckID"},
|
|
},
|
|
"status": &MDBIndex{
|
|
Fields: []string{"Status"},
|
|
},
|
|
"service": &MDBIndex{
|
|
AllowBlank: true,
|
|
Fields: []string{"ServiceName"},
|
|
},
|
|
"node": &MDBIndex{
|
|
AllowBlank: true,
|
|
Fields: []string{"Node", "ServiceID"},
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(structs.HealthCheck)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
s.kvsTable = &MDBTable{
|
|
Name: dbKVS,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"Key"},
|
|
},
|
|
"id_prefix": &MDBIndex{
|
|
Virtual: true,
|
|
RealIndex: "id",
|
|
Fields: []string{"Key"},
|
|
IdxFunc: DefaultIndexPrefixFunc,
|
|
},
|
|
"session": &MDBIndex{
|
|
AllowBlank: true,
|
|
Fields: []string{"Session"},
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(structs.DirEntry)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
s.tombstoneTable = &MDBTable{
|
|
Name: dbTombstone,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"Key"},
|
|
},
|
|
"id_prefix": &MDBIndex{
|
|
Virtual: true,
|
|
RealIndex: "id",
|
|
Fields: []string{"Key"},
|
|
IdxFunc: DefaultIndexPrefixFunc,
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(structs.DirEntry)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
s.sessionTable = &MDBTable{
|
|
Name: dbSessions,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"ID"},
|
|
},
|
|
"node": &MDBIndex{
|
|
AllowBlank: true,
|
|
Fields: []string{"Node"},
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(structs.Session)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
s.sessionCheckTable = &MDBTable{
|
|
Name: dbSessionChecks,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"Node", "CheckID", "Session"},
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(sessionCheck)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
s.aclTable = &MDBTable{
|
|
Name: dbACLs,
|
|
Indexes: map[string]*MDBIndex{
|
|
"id": &MDBIndex{
|
|
Unique: true,
|
|
Fields: []string{"ID"},
|
|
},
|
|
},
|
|
Decoder: func(buf []byte) interface{} {
|
|
out := new(structs.ACL)
|
|
if err := structs.Decode(buf, out); err != nil {
|
|
panic(err)
|
|
}
|
|
return out
|
|
},
|
|
}
|
|
|
|
// Store the set of tables
|
|
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable,
|
|
s.kvsTable, s.tombstoneTable, s.sessionTable, s.sessionCheckTable,
|
|
s.aclTable}
|
|
for _, table := range s.tables {
|
|
table.Env = s.env
|
|
table.Encoder = encoder
|
|
if err := table.Init(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Setup a notification group per table
|
|
s.watch[table] = &NotifyGroup{}
|
|
}
|
|
|
|
// Setup the query tables
|
|
s.queryTables = map[string]MDBTables{
|
|
"Nodes": MDBTables{s.nodeTable},
|
|
"Services": MDBTables{s.serviceTable},
|
|
"ServiceNodes": MDBTables{s.nodeTable, s.serviceTable},
|
|
"NodeServices": MDBTables{s.nodeTable, s.serviceTable},
|
|
"ChecksInState": MDBTables{s.checkTable},
|
|
"NodeChecks": MDBTables{s.checkTable},
|
|
"ServiceChecks": MDBTables{s.checkTable},
|
|
"CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
|
|
"NodeInfo": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
|
|
"NodeDump": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
|
|
"SessionGet": MDBTables{s.sessionTable},
|
|
"SessionList": MDBTables{s.sessionTable},
|
|
"NodeSessions": MDBTables{s.sessionTable},
|
|
"ACLGet": MDBTables{s.aclTable},
|
|
"ACLList": MDBTables{s.aclTable},
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Watch is used to subscribe a channel to a set of MDBTables
|
|
func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
|
|
for _, t := range tables {
|
|
s.watch[t].Wait(notify)
|
|
}
|
|
}
|
|
|
|
// WatchKV is used to subscribe a channel to changes in KV data
|
|
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
|
|
s.kvWatchLock.Lock()
|
|
defer s.kvWatchLock.Unlock()
|
|
|
|
// Check for an existing notify group
|
|
if raw, ok := s.kvWatch.Get(prefix); ok {
|
|
grp := raw.(*NotifyGroup)
|
|
grp.Wait(notify)
|
|
return
|
|
}
|
|
|
|
// Create new notify group
|
|
grp := &NotifyGroup{}
|
|
grp.Wait(notify)
|
|
s.kvWatch.Insert(prefix, grp)
|
|
}
|
|
|
|
// notifyKV is used to notify any KV listeners of a change
|
|
// on a prefix
|
|
func (s *StateStore) notifyKV(path string, prefix bool) {
|
|
s.kvWatchLock.Lock()
|
|
defer s.kvWatchLock.Unlock()
|
|
|
|
var toDelete []string
|
|
fn := func(s string, v interface{}) bool {
|
|
group := v.(*NotifyGroup)
|
|
group.Notify()
|
|
if s != "" {
|
|
toDelete = append(toDelete, s)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Invoke any watcher on the path downward to the key.
|
|
s.kvWatch.WalkPath(path, fn)
|
|
|
|
// If the entire prefix may be affected (e.g. delete tree),
|
|
// invoke the entire prefix
|
|
if prefix {
|
|
s.kvWatch.WalkPrefix(path, fn)
|
|
}
|
|
|
|
// Delete the old watch groups
|
|
for i := len(toDelete) - 1; i >= 0; i-- {
|
|
s.kvWatch.Delete(toDelete[i])
|
|
}
|
|
}
|
|
|
|
// QueryTables returns the Tables that are queried for a given query
|
|
func (s *StateStore) QueryTables(q string) MDBTables {
|
|
return s.queryTables[q]
|
|
}
|
|
|
|
// EnsureRegistration is used to make sure a node, service, and check registration
|
|
// is performed within a single transaction to avoid race conditions on state updates.
|
|
func (s *StateStore) EnsureRegistration(index uint64, req *structs.RegisterRequest) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Ensure the node
|
|
node := structs.Node{req.Node, req.Address}
|
|
if err := s.ensureNodeTxn(index, node, tx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure the service if provided
|
|
if req.Service != nil {
|
|
if err := s.ensureServiceTxn(index, req.Node, req.Service, tx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Ensure the check(s), if provided
|
|
if req.Check != nil {
|
|
if err := s.ensureCheckTxn(index, req.Check, tx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
for _, check := range req.Checks {
|
|
if err := s.ensureCheckTxn(index, check, tx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Commit as one unit
|
|
return tx.Commit()
|
|
}
|
|
|
|
// EnsureNode is used to ensure a given node exists, with the provided address
|
|
func (s *StateStore) EnsureNode(index uint64, node structs.Node) error {
|
|
tx, err := s.nodeTable.StartTxn(false, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Abort()
|
|
if err := s.ensureNodeTxn(index, node, tx); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// ensureNodeTxn is used to ensure a given node exists, with the provided address
|
|
// within a given txn
|
|
func (s *StateStore) ensureNodeTxn(index uint64, node structs.Node, tx *MDBTxn) error {
|
|
if err := s.nodeTable.InsertTxn(tx, node); err != nil {
|
|
return err
|
|
}
|
|
if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.nodeTable].Notify() })
|
|
return nil
|
|
}
|
|
|
|
// GetNode returns all the address of the known and if it was found
|
|
func (s *StateStore) GetNode(name string) (uint64, bool, string) {
|
|
idx, res, err := s.nodeTable.Get("id", name)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Error during node lookup: %v", err)
|
|
return 0, false, ""
|
|
}
|
|
if len(res) == 0 {
|
|
return idx, false, ""
|
|
}
|
|
return idx, true, res[0].(*structs.Node).Address
|
|
}
|
|
|
|
// GetNodes returns all the known nodes, the slice alternates between
|
|
// the node name and address
|
|
func (s *StateStore) Nodes() (uint64, structs.Nodes) {
|
|
idx, res, err := s.nodeTable.Get("id")
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Error getting nodes: %v", err)
|
|
}
|
|
results := make([]structs.Node, len(res))
|
|
for i, r := range res {
|
|
results[i] = *r.(*structs.Node)
|
|
}
|
|
return idx, results
|
|
}
|
|
|
|
// EnsureService is used to ensure a given node exposes a service
|
|
func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeService) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
if err := s.ensureServiceTxn(index, node, ns, tx); err != nil {
|
|
return nil
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// ensureServiceTxn is used to ensure a given node exposes a service in a transaction
|
|
func (s *StateStore) ensureServiceTxn(index uint64, node string, ns *structs.NodeService, tx *MDBTxn) error {
|
|
// Ensure the node exists
|
|
res, err := s.nodeTable.GetTxn(tx, "id", node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(res) == 0 {
|
|
return fmt.Errorf("Missing node registration")
|
|
}
|
|
|
|
// Create the entry
|
|
entry := structs.ServiceNode{
|
|
Node: node,
|
|
ServiceID: ns.ID,
|
|
ServiceName: ns.Service,
|
|
ServiceTags: ns.Tags,
|
|
ServiceAddress: ns.Address,
|
|
ServicePort: ns.Port,
|
|
}
|
|
|
|
// Ensure the service entry is set
|
|
if err := s.serviceTable.InsertTxn(tx, &entry); err != nil {
|
|
return err
|
|
}
|
|
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.serviceTable].Notify() })
|
|
return nil
|
|
}
|
|
|
|
// NodeServices is used to return all the services of a given node
|
|
func (s *StateStore) NodeServices(name string) (uint64, *structs.NodeServices) {
|
|
tables := s.queryTables["NodeServices"]
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
return s.parseNodeServices(tables, tx, name)
|
|
}
|
|
|
|
// parseNodeServices is used to get the services belonging to a
|
|
// node, using a given txn
|
|
func (s *StateStore) parseNodeServices(tables MDBTables, tx *MDBTxn, name string) (uint64, *structs.NodeServices) {
|
|
ns := &structs.NodeServices{
|
|
Services: make(map[string]*structs.NodeService),
|
|
}
|
|
|
|
// Get the maximum index
|
|
index, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to get last index: %v", err))
|
|
}
|
|
|
|
// Get the node first
|
|
res, err := s.nodeTable.GetTxn(tx, "id", name)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get node: %v", err)
|
|
}
|
|
if len(res) == 0 {
|
|
return index, nil
|
|
}
|
|
|
|
// Set the address
|
|
node := res[0].(*structs.Node)
|
|
ns.Node = *node
|
|
|
|
// Get the services
|
|
res, err = s.serviceTable.GetTxn(tx, "id", name)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get node '%s' services: %v", name, err)
|
|
}
|
|
|
|
// Add each service
|
|
for _, r := range res {
|
|
service := r.(*structs.ServiceNode)
|
|
srv := &structs.NodeService{
|
|
ID: service.ServiceID,
|
|
Service: service.ServiceName,
|
|
Tags: service.ServiceTags,
|
|
Address: service.ServiceAddress,
|
|
Port: service.ServicePort,
|
|
}
|
|
ns.Services[srv.ID] = srv
|
|
}
|
|
return index, ns
|
|
}
|
|
|
|
// DeleteNodeService is used to delete a node service
|
|
func (s *StateStore) DeleteNodeService(index uint64, node, id string) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
if n, err := s.serviceTable.DeleteTxn(tx, "id", node, id); err != nil {
|
|
return err
|
|
} else if n > 0 {
|
|
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.serviceTable].Notify() })
|
|
}
|
|
|
|
// Invalidate any sessions using these checks
|
|
checks, err := s.checkTable.GetTxn(tx, "node", node, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, c := range checks {
|
|
check := c.(*structs.HealthCheck)
|
|
if err := s.invalidateCheck(index, tx, node, check.CheckID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if n, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil {
|
|
return err
|
|
} else if n > 0 {
|
|
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.checkTable].Notify() })
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// DeleteNode is used to delete a node and all it's services
|
|
func (s *StateStore) DeleteNode(index uint64, node string) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Invalidate any sessions held by the node
|
|
if err := s.invalidateNode(index, tx, node); err != nil {
|
|
return err
|
|
}
|
|
|
|
if n, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil {
|
|
return err
|
|
} else if n > 0 {
|
|
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.serviceTable].Notify() })
|
|
}
|
|
if n, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil {
|
|
return err
|
|
} else if n > 0 {
|
|
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.checkTable].Notify() })
|
|
}
|
|
if n, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil {
|
|
return err
|
|
} else if n > 0 {
|
|
if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.nodeTable].Notify() })
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// Services is used to return all the services with a list of associated tags
|
|
func (s *StateStore) Services() (uint64, map[string][]string) {
|
|
services := make(map[string][]string)
|
|
idx, res, err := s.serviceTable.Get("id")
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get services: %v", err)
|
|
return idx, services
|
|
}
|
|
for _, r := range res {
|
|
srv := r.(*structs.ServiceNode)
|
|
tags, ok := services[srv.ServiceName]
|
|
if !ok {
|
|
services[srv.ServiceName] = make([]string, 0)
|
|
}
|
|
|
|
for _, tag := range srv.ServiceTags {
|
|
if !strContains(tags, tag) {
|
|
tags = append(tags, tag)
|
|
services[srv.ServiceName] = tags
|
|
}
|
|
}
|
|
}
|
|
return idx, services
|
|
}
|
|
|
|
// ServiceNodes returns the nodes associated with a given service
|
|
func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) {
|
|
tables := s.queryTables["ServiceNodes"]
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to get last index: %v", err))
|
|
}
|
|
|
|
res, err := s.serviceTable.GetTxn(tx, "service", service)
|
|
return idx, s.parseServiceNodes(tx, s.nodeTable, res, err)
|
|
}
|
|
|
|
// ServiceTagNodes returns the nodes associated with a given service matching a tag
|
|
func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes) {
|
|
tables := s.queryTables["ServiceNodes"]
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to get last index: %v", err))
|
|
}
|
|
|
|
res, err := s.serviceTable.GetTxn(tx, "service", service)
|
|
res = serviceTagFilter(res, tag)
|
|
return idx, s.parseServiceNodes(tx, s.nodeTable, res, err)
|
|
}
|
|
|
|
// serviceTagFilter is used to filter a list of *structs.ServiceNode which do
|
|
// not have the specified tag
|
|
func serviceTagFilter(l []interface{}, tag string) []interface{} {
|
|
n := len(l)
|
|
for i := 0; i < n; i++ {
|
|
srv := l[i].(*structs.ServiceNode)
|
|
if !strContains(ToLowerList(srv.ServiceTags), strings.ToLower(tag)) {
|
|
l[i], l[n-1] = l[n-1], nil
|
|
i--
|
|
n--
|
|
}
|
|
}
|
|
return l[:n]
|
|
}
|
|
|
|
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
|
|
func (s *StateStore) parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interface{}, err error) structs.ServiceNodes {
|
|
nodes := make(structs.ServiceNodes, len(res))
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get service nodes: %v", err)
|
|
return nodes
|
|
}
|
|
|
|
for i, r := range res {
|
|
srv := r.(*structs.ServiceNode)
|
|
|
|
// Get the address of the node
|
|
nodeRes, err := table.GetTxn(tx, "id", srv.Node)
|
|
if err != nil || len(nodeRes) != 1 {
|
|
s.logger.Printf("[ERR] consul.state: Failed to join service node %#v with node: %v", *srv, err)
|
|
continue
|
|
}
|
|
srv.Address = nodeRes[0].(*structs.Node).Address
|
|
|
|
nodes[i] = *srv
|
|
}
|
|
|
|
return nodes
|
|
}
|
|
|
|
// EnsureCheck is used to create a check or updates it's state
|
|
func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
if err := s.ensureCheckTxn(index, check, tx); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// ensureCheckTxn is used to create a check or updates it's state in a transaction
|
|
func (s *StateStore) ensureCheckTxn(index uint64, check *structs.HealthCheck, tx *MDBTxn) error {
|
|
// Ensure we have a status
|
|
if check.Status == "" {
|
|
check.Status = structs.HealthCritical
|
|
}
|
|
|
|
// Ensure the node exists
|
|
res, err := s.nodeTable.GetTxn(tx, "id", check.Node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(res) == 0 {
|
|
return fmt.Errorf("Missing node registration")
|
|
}
|
|
|
|
// Ensure the service exists if specified
|
|
if check.ServiceID != "" {
|
|
res, err = s.serviceTable.GetTxn(tx, "id", check.Node, check.ServiceID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(res) == 0 {
|
|
return fmt.Errorf("Missing service registration")
|
|
}
|
|
// Ensure we set the correct service
|
|
srv := res[0].(*structs.ServiceNode)
|
|
check.ServiceName = srv.ServiceName
|
|
}
|
|
|
|
// Invalidate any sessions if status is critical
|
|
if check.Status == structs.HealthCritical {
|
|
err := s.invalidateCheck(index, tx, check.Node, check.CheckID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Ensure the check is set
|
|
if err := s.checkTable.InsertTxn(tx, check); err != nil {
|
|
return err
|
|
}
|
|
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.checkTable].Notify() })
|
|
return nil
|
|
}
|
|
|
|
// DeleteNodeCheck is used to delete a node health check
|
|
func (s *StateStore) DeleteNodeCheck(index uint64, node, id string) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Invalidate any sessions held by this check
|
|
if err := s.invalidateCheck(index, tx, node, id); err != nil {
|
|
return err
|
|
}
|
|
|
|
if n, err := s.checkTable.DeleteTxn(tx, "id", node, id); err != nil {
|
|
return err
|
|
} else if n > 0 {
|
|
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.checkTable].Notify() })
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// NodeChecks is used to get all the checks for a node
|
|
func (s *StateStore) NodeChecks(node string) (uint64, structs.HealthChecks) {
|
|
return s.parseHealthChecks(s.checkTable.Get("id", node))
|
|
}
|
|
|
|
// ServiceChecks is used to get all the checks for a service
|
|
func (s *StateStore) ServiceChecks(service string) (uint64, structs.HealthChecks) {
|
|
return s.parseHealthChecks(s.checkTable.Get("service", service))
|
|
}
|
|
|
|
// CheckInState is used to get all the checks for a service in a given state
|
|
func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks) {
|
|
var idx uint64
|
|
var res []interface{}
|
|
var err error
|
|
if state == structs.HealthAny {
|
|
idx, res, err = s.checkTable.Get("id")
|
|
} else {
|
|
idx, res, err = s.checkTable.Get("status", state)
|
|
}
|
|
return s.parseHealthChecks(idx, res, err)
|
|
}
|
|
|
|
// parseHealthChecks is used to handle the resutls of a Get against
|
|
// the checkTable
|
|
func (s *StateStore) parseHealthChecks(idx uint64, res []interface{}, err error) (uint64, structs.HealthChecks) {
|
|
results := make([]*structs.HealthCheck, len(res))
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get health checks: %v", err)
|
|
return idx, results
|
|
}
|
|
for i, r := range res {
|
|
results[i] = r.(*structs.HealthCheck)
|
|
}
|
|
return idx, results
|
|
}
|
|
|
|
// CheckServiceNodes returns the nodes associated with a given service, along
|
|
// with any associated check
|
|
func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes) {
|
|
tables := s.queryTables["CheckServiceNodes"]
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to get last index: %v", err))
|
|
}
|
|
|
|
res, err := s.serviceTable.GetTxn(tx, "service", service)
|
|
return idx, s.parseCheckServiceNodes(tx, res, err)
|
|
}
|
|
|
|
// CheckServiceNodes returns the nodes associated with a given service, along
|
|
// with any associated checks
|
|
func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes) {
|
|
tables := s.queryTables["CheckServiceNodes"]
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to get last index: %v", err))
|
|
}
|
|
|
|
res, err := s.serviceTable.GetTxn(tx, "service", service)
|
|
res = serviceTagFilter(res, tag)
|
|
return idx, s.parseCheckServiceNodes(tx, res, err)
|
|
}
|
|
|
|
// parseCheckServiceNodes parses results CheckServiceNodes and CheckServiceTagNodes
|
|
func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err error) structs.CheckServiceNodes {
|
|
nodes := make(structs.CheckServiceNodes, len(res))
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get service nodes: %v", err)
|
|
return nodes
|
|
}
|
|
|
|
for i, r := range res {
|
|
srv := r.(*structs.ServiceNode)
|
|
|
|
// Get the node
|
|
nodeRes, err := s.nodeTable.GetTxn(tx, "id", srv.Node)
|
|
if err != nil || len(nodeRes) != 1 {
|
|
s.logger.Printf("[ERR] consul.state: Failed to join service node %#v with node: %v", *srv, err)
|
|
continue
|
|
}
|
|
|
|
// Get any associated checks of the service
|
|
res, err := s.checkTable.GetTxn(tx, "node", srv.Node, srv.ServiceID)
|
|
_, checks := s.parseHealthChecks(0, res, err)
|
|
|
|
// Get any checks of the node, not assciated with any service
|
|
res, err = s.checkTable.GetTxn(tx, "node", srv.Node, "")
|
|
_, nodeChecks := s.parseHealthChecks(0, res, err)
|
|
checks = append(checks, nodeChecks...)
|
|
|
|
// Setup the node
|
|
nodes[i].Node = *nodeRes[0].(*structs.Node)
|
|
nodes[i].Service = structs.NodeService{
|
|
ID: srv.ServiceID,
|
|
Service: srv.ServiceName,
|
|
Tags: srv.ServiceTags,
|
|
Address: srv.ServiceAddress,
|
|
Port: srv.ServicePort,
|
|
}
|
|
nodes[i].Checks = checks
|
|
}
|
|
|
|
return nodes
|
|
}
|
|
|
|
// NodeInfo is used to generate the full info about a node.
|
|
func (s *StateStore) NodeInfo(node string) (uint64, structs.NodeDump) {
|
|
tables := s.queryTables["NodeInfo"]
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to get last index: %v", err))
|
|
}
|
|
|
|
res, err := s.nodeTable.GetTxn(tx, "id", node)
|
|
return idx, s.parseNodeInfo(tx, res, err)
|
|
}
|
|
|
|
// NodeDump is used to generate the NodeInfo for all nodes. This is very expensive,
|
|
// and should generally be avoided for programatic access.
|
|
func (s *StateStore) NodeDump() (uint64, structs.NodeDump) {
|
|
tables := s.queryTables["NodeDump"]
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to get last index: %v", err))
|
|
}
|
|
|
|
res, err := s.nodeTable.GetTxn(tx, "id")
|
|
return idx, s.parseNodeInfo(tx, res, err)
|
|
}
|
|
|
|
// parseNodeInfo is used to scan over the results of a node
|
|
// iteration and generate a NodeDump
|
|
func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) structs.NodeDump {
|
|
dump := make(structs.NodeDump, 0, len(res))
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get nodes: %v", err)
|
|
return dump
|
|
}
|
|
|
|
for _, r := range res {
|
|
// Copy the address and node
|
|
node := r.(*structs.Node)
|
|
info := &structs.NodeInfo{
|
|
Node: node.Node,
|
|
Address: node.Address,
|
|
}
|
|
|
|
// Get any services of the node
|
|
res, err = s.serviceTable.GetTxn(tx, "id", node.Node)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get node services: %v", err)
|
|
}
|
|
info.Services = make([]*structs.NodeService, 0, len(res))
|
|
for _, r := range res {
|
|
service := r.(*structs.ServiceNode)
|
|
srv := &structs.NodeService{
|
|
ID: service.ServiceID,
|
|
Service: service.ServiceName,
|
|
Tags: service.ServiceTags,
|
|
Address: service.ServiceAddress,
|
|
Port: service.ServicePort,
|
|
}
|
|
info.Services = append(info.Services, srv)
|
|
}
|
|
|
|
// Get any checks of the node
|
|
res, err = s.checkTable.GetTxn(tx, "node", node.Node)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: Failed to get node checks: %v", err)
|
|
}
|
|
info.Checks = make([]*structs.HealthCheck, 0, len(res))
|
|
for _, r := range res {
|
|
chk := r.(*structs.HealthCheck)
|
|
info.Checks = append(info.Checks, chk)
|
|
}
|
|
|
|
// Add the node info
|
|
dump = append(dump, info)
|
|
}
|
|
return dump
|
|
}
|
|
|
|
// KVSSet is used to create or update a KV entry
|
|
func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error {
|
|
_, err := s.kvsSet(index, d, kvSet)
|
|
return err
|
|
}
|
|
|
|
// KVSRestore is used to restore a DirEntry. It should only be used when
|
|
// doing a restore, otherwise KVSSet should be used.
|
|
func (s *StateStore) KVSRestore(d *structs.DirEntry) error {
|
|
// Start a new txn
|
|
tx, err := s.kvsTable.StartTxn(false, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
|
return err
|
|
}
|
|
if err := s.kvsTable.SetMaxLastIndexTxn(tx, d.ModifyIndex); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// KVSGet is used to get a KV entry
|
|
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
|
|
idx, res, err := s.kvsTable.Get("id", key)
|
|
var d *structs.DirEntry
|
|
if len(res) > 0 {
|
|
d = res[0].(*structs.DirEntry)
|
|
}
|
|
return idx, d, err
|
|
}
|
|
|
|
// KVSList is used to list all KV entries with a prefix
|
|
func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) {
|
|
tables := MDBTables{s.kvsTable, s.tombstoneTable}
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
|
|
res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
ents := make(structs.DirEntries, len(res))
|
|
for idx, r := range res {
|
|
ents[idx] = r.(*structs.DirEntry)
|
|
}
|
|
|
|
// Check for the higest index in the tombstone table
|
|
var maxIndex uint64
|
|
res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix)
|
|
for _, r := range res {
|
|
ent := r.(*structs.DirEntry)
|
|
if ent.ModifyIndex > maxIndex {
|
|
maxIndex = ent.ModifyIndex
|
|
}
|
|
}
|
|
|
|
return maxIndex, idx, ents, err
|
|
}
|
|
|
|
// KVSListKeys is used to list keys with a prefix, and up to a given seperator
|
|
func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) {
|
|
tables := MDBTables{s.kvsTable, s.tombstoneTable}
|
|
tx, err := tables.StartTxn(true)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
idx, err := s.kvsTable.LastIndexTxn(tx)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
// Ensure a non-zero index
|
|
if idx == 0 {
|
|
// Must provide non-zero index to prevent blocking
|
|
// Index 1 is impossible anyways (due to Raft internals)
|
|
idx = 1
|
|
}
|
|
|
|
// Aggregate the stream
|
|
stream := make(chan interface{}, 128)
|
|
streamTomb := make(chan interface{}, 128)
|
|
done := make(chan struct{})
|
|
var keys []string
|
|
var maxIndex uint64
|
|
go func() {
|
|
prefixLen := len(prefix)
|
|
sepLen := len(seperator)
|
|
last := ""
|
|
for raw := range stream {
|
|
ent := raw.(*structs.DirEntry)
|
|
after := ent.Key[prefixLen:]
|
|
|
|
// Update the hightest index we've seen
|
|
if ent.ModifyIndex > maxIndex {
|
|
maxIndex = ent.ModifyIndex
|
|
}
|
|
|
|
// If there is no seperator, always accumulate
|
|
if sepLen == 0 {
|
|
keys = append(keys, ent.Key)
|
|
continue
|
|
}
|
|
|
|
// Check for the seperator
|
|
if idx := strings.Index(after, seperator); idx >= 0 {
|
|
toSep := ent.Key[:prefixLen+idx+sepLen]
|
|
if last != toSep {
|
|
keys = append(keys, toSep)
|
|
last = toSep
|
|
}
|
|
} else {
|
|
keys = append(keys, ent.Key)
|
|
}
|
|
}
|
|
|
|
// Handle the tombstones for any index updates
|
|
for raw := range streamTomb {
|
|
ent := raw.(*structs.DirEntry)
|
|
if ent.ModifyIndex > maxIndex {
|
|
maxIndex = ent.ModifyIndex
|
|
}
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
// Start the stream, and wait for completion
|
|
if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil {
|
|
return 0, nil, err
|
|
}
|
|
if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil {
|
|
return 0, nil, err
|
|
}
|
|
<-done
|
|
|
|
// Use the maxIndex if we have any keys
|
|
if maxIndex != 0 {
|
|
idx = maxIndex
|
|
}
|
|
return idx, keys, nil
|
|
}
|
|
|
|
// KVSDelete is used to delete a KVS entry
|
|
func (s *StateStore) KVSDelete(index uint64, key string) error {
|
|
return s.kvsDeleteWithIndex(index, "id", key)
|
|
}
|
|
|
|
// KVSDeleteCheckAndSet is used to perform an atomic delete check-and-set
|
|
func (s *StateStore) KVSDeleteCheckAndSet(index uint64, key string, casIndex uint64) (bool, error) {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Get the existing node
|
|
res, err := s.kvsTable.GetTxn(tx, "id", key)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Get the existing node if any
|
|
var exist *structs.DirEntry
|
|
if len(res) > 0 {
|
|
exist = res[0].(*structs.DirEntry)
|
|
}
|
|
|
|
// Use the casIndex as the constraint. A modify time of 0 means
|
|
// we are doign a delete-if-not-exists (odd...), while any other
|
|
// value means we expect that modify time.
|
|
if casIndex == 0 {
|
|
return exist == nil, nil
|
|
} else if casIndex > 0 && (exist == nil || exist.ModifyIndex != casIndex) {
|
|
return false, nil
|
|
}
|
|
|
|
// Do the actual delete
|
|
if err := s.kvsDeleteWithIndexTxn(index, tx, "id", key); err != nil {
|
|
return false, err
|
|
}
|
|
return true, tx.Commit()
|
|
}
|
|
|
|
// KVSDeleteTree is used to delete all keys with a given prefix
|
|
func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
|
|
if prefix == "" {
|
|
return s.kvsDeleteWithIndex(index, "id")
|
|
}
|
|
return s.kvsDeleteWithIndex(index, "id_prefix", prefix)
|
|
}
|
|
|
|
// kvsDeleteWithIndex does a delete with either the id or id_prefix
|
|
func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Abort()
|
|
if err := s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// kvsDeleteWithIndexTxn does a delete within an existing transaction
|
|
func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error {
|
|
num := 0
|
|
for {
|
|
// Get some number of entries to delete
|
|
pairs, err := s.kvsTable.GetTxnLimit(tx, 128, tableIndex, parts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create the tombstones and delete
|
|
for _, raw := range pairs {
|
|
ent := raw.(*structs.DirEntry)
|
|
ent.ModifyIndex = index // Update the index
|
|
ent.Value = nil // Reduce storage required
|
|
ent.Session = ""
|
|
if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil {
|
|
return err
|
|
}
|
|
if num, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil {
|
|
return err
|
|
} else if num != 1 {
|
|
return fmt.Errorf("Failed to delete key '%s'", ent.Key)
|
|
}
|
|
}
|
|
|
|
// Increment the total number
|
|
num += len(pairs)
|
|
if len(pairs) == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
if num > 0 {
|
|
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() {
|
|
// Trigger the most fine grained notifications if possible
|
|
switch {
|
|
case len(parts) == 0:
|
|
s.notifyKV("", true)
|
|
case tableIndex == "id":
|
|
s.notifyKV(parts[0], false)
|
|
case tableIndex == "id_prefix":
|
|
s.notifyKV(parts[0], true)
|
|
default:
|
|
s.notifyKV("", true)
|
|
}
|
|
if s.gc != nil {
|
|
// If GC is configured, then we hint that this index
|
|
// required expiration.
|
|
s.gc.Hint(index)
|
|
}
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// KVSCheckAndSet is used to perform an atomic check-and-set
|
|
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
|
|
return s.kvsSet(index, d, kvCAS)
|
|
}
|
|
|
|
// KVSLock works like KVSSet but only writes if the lock can be acquired
|
|
func (s *StateStore) KVSLock(index uint64, d *structs.DirEntry) (bool, error) {
|
|
return s.kvsSet(index, d, kvLock)
|
|
}
|
|
|
|
// KVSUnlock works like KVSSet but only writes if the lock can be unlocked
|
|
func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) {
|
|
return s.kvsSet(index, d, kvUnlock)
|
|
}
|
|
|
|
// KVSLockDelay returns the expiration time of a key lock delay. A key may
|
|
// have a lock delay if it was unlocked due to a session invalidation instead
|
|
// of a graceful unlock. This must be checked on the leader node, and not in
|
|
// KVSLock due to the variability of clocks.
|
|
func (s *StateStore) KVSLockDelay(key string) time.Time {
|
|
s.lockDelayLock.RLock()
|
|
expires := s.lockDelay[key]
|
|
s.lockDelayLock.RUnlock()
|
|
return expires
|
|
}
|
|
|
|
// kvsSet is the internal setter
|
|
func (s *StateStore) kvsSet(
|
|
index uint64,
|
|
d *structs.DirEntry,
|
|
mode kvMode) (bool, error) {
|
|
// Start a new txn
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Get the existing node
|
|
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Get the existing node if any
|
|
var exist *structs.DirEntry
|
|
if len(res) > 0 {
|
|
exist = res[0].(*structs.DirEntry)
|
|
}
|
|
|
|
// Use the ModifyIndex as the constraint. A modify of time of 0
|
|
// means we are doing a set-if-not-exists, while any other value
|
|
// means we expect that modify time.
|
|
if mode == kvCAS {
|
|
if d.ModifyIndex == 0 && exist != nil {
|
|
return false, nil
|
|
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
// If attempting to lock, check this is possible
|
|
if mode == kvLock {
|
|
// Verify we have a session
|
|
if d.Session == "" {
|
|
return false, fmt.Errorf("Missing session")
|
|
}
|
|
|
|
// Bail if it is already locked
|
|
if exist != nil && exist.Session != "" {
|
|
return false, nil
|
|
}
|
|
|
|
// Verify the session exists
|
|
res, err := s.sessionTable.GetTxn(tx, "id", d.Session)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if len(res) == 0 {
|
|
return false, fmt.Errorf("Invalid session")
|
|
}
|
|
|
|
// Update the lock index
|
|
if exist != nil {
|
|
exist.LockIndex++
|
|
exist.Session = d.Session
|
|
} else {
|
|
d.LockIndex = 1
|
|
}
|
|
}
|
|
|
|
// If attempting to unlock, verify the key exists and is held
|
|
if mode == kvUnlock {
|
|
if exist == nil || exist.Session != d.Session {
|
|
return false, nil
|
|
}
|
|
// Clear the session to unlock
|
|
exist.Session = ""
|
|
}
|
|
|
|
// Set the create and modify times
|
|
if exist == nil {
|
|
d.CreateIndex = index
|
|
} else {
|
|
d.CreateIndex = exist.CreateIndex
|
|
d.LockIndex = exist.LockIndex
|
|
d.Session = exist.Session
|
|
|
|
}
|
|
d.ModifyIndex = index
|
|
|
|
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
|
return false, err
|
|
}
|
|
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return false, err
|
|
}
|
|
tx.Defer(func() { s.notifyKV(d.Key, false) })
|
|
return true, tx.Commit()
|
|
}
|
|
|
|
// ReapTombstones is used to delete all the tombstones with a ModifyTime
|
|
// less than or equal to the given index. This is used to prevent unbounded
|
|
// storage growth of the tombstones.
|
|
func (s *StateStore) ReapTombstones(index uint64) error {
|
|
tx, err := s.tombstoneTable.StartTxn(false, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start txn: %v", err)
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Scan the tombstone table for all the entries that are
|
|
// eligble for GC. This could be improved by indexing on
|
|
// ModifyTime and doing a less-than-equals scan, however
|
|
// we don't currently support numeric indexes internally.
|
|
// Luckily, this is a low frequency operation.
|
|
var toDelete []string
|
|
streamCh := make(chan interface{}, 128)
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
for raw := range streamCh {
|
|
ent := raw.(*structs.DirEntry)
|
|
if ent.ModifyIndex <= index {
|
|
toDelete = append(toDelete, ent.Key)
|
|
}
|
|
}
|
|
}()
|
|
if err := s.tombstoneTable.StreamTxn(streamCh, tx, "id"); err != nil {
|
|
s.logger.Printf("[ERR] consul.state: failed to scan tombstones: %v", err)
|
|
return fmt.Errorf("failed to scan tombstones: %v", err)
|
|
}
|
|
<-doneCh
|
|
|
|
// Delete each tombstone
|
|
if len(toDelete) > 0 {
|
|
s.logger.Printf("[DEBUG] consul.state: reaping %d tombstones up to %d", len(toDelete), index)
|
|
}
|
|
for _, key := range toDelete {
|
|
num, err := s.tombstoneTable.DeleteTxn(tx, "id", key)
|
|
if err != nil {
|
|
s.logger.Printf("[ERR] consul.state: failed to delete tombstone: %v", err)
|
|
return fmt.Errorf("failed to delete tombstone: %v", err)
|
|
}
|
|
if num != 1 {
|
|
return fmt.Errorf("failed to delete tombstone '%s'", key)
|
|
}
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// TombstoneRestore is used to restore a tombstone.
|
|
// It should only be used when doing a restore.
|
|
func (s *StateStore) TombstoneRestore(d *structs.DirEntry) error {
|
|
// Start a new txn
|
|
tx, err := s.tombstoneTable.StartTxn(false, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
if err := s.tombstoneTable.InsertTxn(tx, d); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// SessionCreate is used to create a new session. The
|
|
// ID will be populated on a successful return
|
|
func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error {
|
|
// Verify a Session ID is generated
|
|
if session.ID == "" {
|
|
return fmt.Errorf("Missing Session ID")
|
|
}
|
|
|
|
switch session.Behavior {
|
|
case "":
|
|
// Default behavior is Release for backwards compatibility
|
|
session.Behavior = structs.SessionKeysRelease
|
|
case structs.SessionKeysRelease:
|
|
case structs.SessionKeysDelete:
|
|
default:
|
|
return fmt.Errorf("Invalid Session Behavior setting '%s'", session.Behavior)
|
|
}
|
|
|
|
if session.TTL != "" {
|
|
ttl, err := time.ParseDuration(session.TTL)
|
|
if err != nil {
|
|
return fmt.Errorf("Invalid Session TTL '%s': %v", session.TTL, err)
|
|
}
|
|
|
|
if ttl != 0 && (ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax) {
|
|
return fmt.Errorf("Invalid Session TTL '%s', must be between [%v-%v]",
|
|
session.TTL, structs.SessionTTLMin, structs.SessionTTLMax)
|
|
}
|
|
}
|
|
|
|
// Assign the create index
|
|
session.CreateIndex = index
|
|
|
|
// Start the transaction
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Verify that the node exists
|
|
res, err := s.nodeTable.GetTxn(tx, "id", session.Node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(res) == 0 {
|
|
return fmt.Errorf("Missing node registration")
|
|
}
|
|
|
|
// Verify that the checks exist and are not critical
|
|
for _, checkId := range session.Checks {
|
|
res, err := s.checkTable.GetTxn(tx, "id", session.Node, checkId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(res) == 0 {
|
|
return fmt.Errorf("Missing check '%s' registration", checkId)
|
|
}
|
|
chk := res[0].(*structs.HealthCheck)
|
|
if chk.Status == structs.HealthCritical {
|
|
return fmt.Errorf("Check '%s' is in %s state", checkId, chk.Status)
|
|
}
|
|
}
|
|
|
|
// Insert the session
|
|
if err := s.sessionTable.InsertTxn(tx, session); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert the check mappings
|
|
sCheck := sessionCheck{Node: session.Node, Session: session.ID}
|
|
for _, checkID := range session.Checks {
|
|
sCheck.CheckID = checkID
|
|
if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Trigger the update notifications
|
|
if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.sessionTable].Notify() })
|
|
return tx.Commit()
|
|
}
|
|
|
|
// SessionRestore is used to restore a session. It should only be used when
|
|
// doing a restore, otherwise SessionCreate should be used.
|
|
func (s *StateStore) SessionRestore(session *structs.Session) error {
|
|
// Start the transaction
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Insert the session
|
|
if err := s.sessionTable.InsertTxn(tx, session); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert the check mappings
|
|
sCheck := sessionCheck{Node: session.Node, Session: session.ID}
|
|
for _, checkID := range session.Checks {
|
|
sCheck.CheckID = checkID
|
|
if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Trigger the update notifications
|
|
index := session.CreateIndex
|
|
if err := s.sessionTable.SetMaxLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.sessionTable].Notify() })
|
|
return tx.Commit()
|
|
}
|
|
|
|
// SessionGet is used to get a session entry
|
|
func (s *StateStore) SessionGet(id string) (uint64, *structs.Session, error) {
|
|
idx, res, err := s.sessionTable.Get("id", id)
|
|
var d *structs.Session
|
|
if len(res) > 0 {
|
|
d = res[0].(*structs.Session)
|
|
}
|
|
return idx, d, err
|
|
}
|
|
|
|
// SessionList is used to list all the open sessions
|
|
func (s *StateStore) SessionList() (uint64, []*structs.Session, error) {
|
|
idx, res, err := s.sessionTable.Get("id")
|
|
out := make([]*structs.Session, len(res))
|
|
for i, raw := range res {
|
|
out[i] = raw.(*structs.Session)
|
|
}
|
|
return idx, out, err
|
|
}
|
|
|
|
// NodeSessions is used to list all the open sessions for a node
|
|
func (s *StateStore) NodeSessions(node string) (uint64, []*structs.Session, error) {
|
|
idx, res, err := s.sessionTable.Get("node", node)
|
|
out := make([]*structs.Session, len(res))
|
|
for i, raw := range res {
|
|
out[i] = raw.(*structs.Session)
|
|
}
|
|
return idx, out, err
|
|
}
|
|
|
|
// SessionDestroy is used to destroy a session.
|
|
func (s *StateStore) SessionDestroy(index uint64, id string) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy",
|
|
id)
|
|
if err := s.invalidateSession(index, tx, id); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// invalideNode is used to invalide all sessions belonging to a node
|
|
// All tables should be locked in the tx.
|
|
func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error {
|
|
sessions, err := s.sessionTable.GetTxn(tx, "node", node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, sess := range sessions {
|
|
session := sess.(*structs.Session).ID
|
|
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation",
|
|
session, node)
|
|
if err := s.invalidateSession(index, tx, session); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// invalidateCheck is used to invalide all sessions belonging to a check
|
|
// All tables should be locked in the tx.
|
|
func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check string) error {
|
|
sessionChecks, err := s.sessionCheckTable.GetTxn(tx, "id", node, check)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, sc := range sessionChecks {
|
|
session := sc.(*sessionCheck).Session
|
|
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation",
|
|
session, check)
|
|
if err := s.invalidateSession(index, tx, session); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// invalidateSession is used to invalide a session within a given txn
|
|
// All tables should be locked in the tx.
|
|
func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) error {
|
|
// Get the session
|
|
res, err := s.sessionTable.GetTxn(tx, "id", id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Quit if this session does not exist
|
|
if len(res) == 0 {
|
|
return nil
|
|
}
|
|
session := res[0].(*structs.Session)
|
|
|
|
// Enforce the MaxLockDelay
|
|
delay := session.LockDelay
|
|
if delay > structs.MaxLockDelay {
|
|
delay = structs.MaxLockDelay
|
|
}
|
|
|
|
// Invalidate any held locks
|
|
if session.Behavior == structs.SessionKeysDelete {
|
|
if err := s.deleteLocks(index, tx, delay, id); err != nil {
|
|
return err
|
|
}
|
|
} else if err := s.invalidateLocks(index, tx, delay, id); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Nuke the session
|
|
if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete the check mappings
|
|
for _, checkID := range session.Checks {
|
|
if _, err := s.sessionCheckTable.DeleteTxn(tx, "id",
|
|
session.Node, checkID, id); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Trigger the update notifications
|
|
if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.sessionTable].Notify() })
|
|
return nil
|
|
}
|
|
|
|
// invalidateLocks is used to invalidate all the locks held by a session
|
|
// within a given txn. All tables should be locked in the tx.
|
|
func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
|
|
lockDelay time.Duration, id string) error {
|
|
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var expires time.Time
|
|
if lockDelay > 0 {
|
|
s.lockDelayLock.Lock()
|
|
defer s.lockDelayLock.Unlock()
|
|
expires = time.Now().Add(lockDelay)
|
|
}
|
|
|
|
for _, pair := range pairs {
|
|
kv := pair.(*structs.DirEntry)
|
|
kv.Session = "" // Clear the lock
|
|
kv.ModifyIndex = index // Update the modified time
|
|
if err := s.kvsTable.InsertTxn(tx, kv); err != nil {
|
|
return err
|
|
}
|
|
// If there is a lock delay, prevent acquisition
|
|
// for at least lockDelay period
|
|
if lockDelay > 0 {
|
|
s.lockDelay[kv.Key] = expires
|
|
time.AfterFunc(lockDelay, func() {
|
|
s.lockDelayLock.Lock()
|
|
delete(s.lockDelay, kv.Key)
|
|
s.lockDelayLock.Unlock()
|
|
})
|
|
}
|
|
tx.Defer(func() { s.notifyKV(kv.Key, false) })
|
|
}
|
|
if len(pairs) > 0 {
|
|
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deleteLocks is used to delete all the locks held by a session
|
|
// within a given txn. All tables should be locked in the tx.
|
|
func (s *StateStore) deleteLocks(index uint64, tx *MDBTxn,
|
|
lockDelay time.Duration, id string) error {
|
|
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var expires time.Time
|
|
if lockDelay > 0 {
|
|
s.lockDelayLock.Lock()
|
|
defer s.lockDelayLock.Unlock()
|
|
expires = time.Now().Add(lockDelay)
|
|
}
|
|
|
|
for _, pair := range pairs {
|
|
kv := pair.(*structs.DirEntry)
|
|
if err := s.kvsDeleteWithIndexTxn(index, tx, "id", kv.Key); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If there is a lock delay, prevent acquisition
|
|
// for at least lockDelay period
|
|
if lockDelay > 0 {
|
|
s.lockDelay[kv.Key] = expires
|
|
time.AfterFunc(lockDelay, func() {
|
|
s.lockDelayLock.Lock()
|
|
delete(s.lockDelay, kv.Key)
|
|
s.lockDelayLock.Unlock()
|
|
})
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ACLSet is used to create or update an ACL entry
|
|
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
|
|
// Check for an ID
|
|
if acl.ID == "" {
|
|
return fmt.Errorf("Missing ACL ID")
|
|
}
|
|
|
|
// Start a new txn
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
// Look for the existing node
|
|
res, err := s.aclTable.GetTxn(tx, "id", acl.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch len(res) {
|
|
case 0:
|
|
acl.CreateIndex = index
|
|
acl.ModifyIndex = index
|
|
case 1:
|
|
exist := res[0].(*structs.ACL)
|
|
acl.CreateIndex = exist.CreateIndex
|
|
acl.ModifyIndex = index
|
|
default:
|
|
panic(fmt.Errorf("Duplicate ACL definition. Internal error"))
|
|
}
|
|
|
|
// Insert the ACL
|
|
if err := s.aclTable.InsertTxn(tx, acl); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Trigger the update notifications
|
|
if err := s.aclTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.aclTable].Notify() })
|
|
return tx.Commit()
|
|
}
|
|
|
|
// ACLRestore is used to restore an ACL. It should only be used when
|
|
// doing a restore, otherwise ACLSet should be used.
|
|
func (s *StateStore) ACLRestore(acl *structs.ACL) error {
|
|
// Start a new txn
|
|
tx, err := s.aclTable.StartTxn(false, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Abort()
|
|
|
|
if err := s.aclTable.InsertTxn(tx, acl); err != nil {
|
|
return err
|
|
}
|
|
if err := s.aclTable.SetMaxLastIndexTxn(tx, acl.ModifyIndex); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// ACLGet is used to get an ACL by ID
|
|
func (s *StateStore) ACLGet(id string) (uint64, *structs.ACL, error) {
|
|
idx, res, err := s.aclTable.Get("id", id)
|
|
var d *structs.ACL
|
|
if len(res) > 0 {
|
|
d = res[0].(*structs.ACL)
|
|
}
|
|
return idx, d, err
|
|
}
|
|
|
|
// ACLList is used to list all the acls
|
|
func (s *StateStore) ACLList() (uint64, []*structs.ACL, error) {
|
|
idx, res, err := s.aclTable.Get("id")
|
|
out := make([]*structs.ACL, len(res))
|
|
for i, raw := range res {
|
|
out[i] = raw.(*structs.ACL)
|
|
}
|
|
return idx, out, err
|
|
}
|
|
|
|
// ACLDelete is used to remove an ACL
|
|
func (s *StateStore) ACLDelete(index uint64, id string) error {
|
|
tx, err := s.tables.StartTxn(false)
|
|
if err != nil {
|
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
|
}
|
|
defer tx.Abort()
|
|
|
|
if n, err := s.aclTable.DeleteTxn(tx, "id", id); err != nil {
|
|
return err
|
|
} else if n > 0 {
|
|
if err := s.aclTable.SetLastIndexTxn(tx, index); err != nil {
|
|
return err
|
|
}
|
|
tx.Defer(func() { s.watch[s.aclTable].Notify() })
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// Snapshot is used to create a point in time snapshot
|
|
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
|
// Begin a new txn on all tables
|
|
tx, err := s.tables.StartTxn(true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Determine the max index
|
|
index, err := s.tables.LastIndexTxn(tx)
|
|
if err != nil {
|
|
tx.Abort()
|
|
return nil, err
|
|
}
|
|
|
|
// Return the snapshot
|
|
snap := &StateSnapshot{
|
|
store: s,
|
|
tx: tx,
|
|
lastIndex: index,
|
|
}
|
|
return snap, nil
|
|
}
|
|
|
|
// LastIndex returns the last index that affects the snapshotted data
|
|
func (s *StateSnapshot) LastIndex() uint64 {
|
|
return s.lastIndex
|
|
}
|
|
|
|
// Nodes returns all the known nodes, the slice alternates between
|
|
// the node name and address
|
|
func (s *StateSnapshot) Nodes() structs.Nodes {
|
|
res, err := s.store.nodeTable.GetTxn(s.tx, "id")
|
|
if err != nil {
|
|
s.store.logger.Printf("[ERR] consul.state: Failed to get nodes: %v", err)
|
|
return nil
|
|
}
|
|
results := make([]structs.Node, len(res))
|
|
for i, r := range res {
|
|
results[i] = *r.(*structs.Node)
|
|
}
|
|
return results
|
|
}
|
|
|
|
// NodeServices is used to return all the services of a given node
|
|
func (s *StateSnapshot) NodeServices(name string) *structs.NodeServices {
|
|
_, res := s.store.parseNodeServices(s.store.tables, s.tx, name)
|
|
return res
|
|
}
|
|
|
|
// NodeChecks is used to return all the checks of a given node
|
|
func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks {
|
|
res, err := s.store.checkTable.GetTxn(s.tx, "id", node)
|
|
_, checks := s.store.parseHealthChecks(s.lastIndex, res, err)
|
|
return checks
|
|
}
|
|
|
|
// KVSDump is used to list all KV entries. It takes a channel and streams
|
|
// back *struct.DirEntry objects. This will block and should be invoked
|
|
// in a goroutine.
|
|
func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
|
|
return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
|
|
}
|
|
|
|
// TombstoneDump is used to dump all tombstone entries. It takes a channel and streams
|
|
// back *struct.DirEntry objects. This will block and should be invoked
|
|
// in a goroutine.
|
|
func (s *StateSnapshot) TombstoneDump(stream chan<- interface{}) error {
|
|
return s.store.tombstoneTable.StreamTxn(stream, s.tx, "id")
|
|
}
|
|
|
|
// SessionList is used to list all the open sessions
|
|
func (s *StateSnapshot) SessionList() ([]*structs.Session, error) {
|
|
res, err := s.store.sessionTable.GetTxn(s.tx, "id")
|
|
out := make([]*structs.Session, len(res))
|
|
for i, raw := range res {
|
|
out[i] = raw.(*structs.Session)
|
|
}
|
|
return out, err
|
|
}
|
|
|
|
// ACLList is used to list all of the ACLs
|
|
func (s *StateSnapshot) ACLList() ([]*structs.ACL, error) {
|
|
res, err := s.store.aclTable.GetTxn(s.tx, "id")
|
|
out := make([]*structs.ACL, len(res))
|
|
for i, raw := range res {
|
|
out[i] = raw.(*structs.ACL)
|
|
}
|
|
return out, err
|
|
}
|