mirror of https://github.com/status-im/consul.git
Completes state store for KV, sessions, tombstones, and nodes/services/checks (needs tests and integration).
This commit is contained in:
parent
009fd7d9f5
commit
30736bae5a
|
@ -123,7 +123,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
|
|||
state := a.srv.fsm.StateNew()
|
||||
return a.srv.blockingRPCNew(&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetWatchManager("acls"),
|
||||
state.GetTableWatch("acls"),
|
||||
func() error {
|
||||
acl, err := state.ACLGet(args.ACL)
|
||||
if acl != nil {
|
||||
|
@ -194,7 +194,7 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
|
|||
state := a.srv.fsm.StateNew()
|
||||
return a.srv.blockingRPCNew(&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
state.GetWatchManager("acls"),
|
||||
state.GetTableWatch("acls"),
|
||||
func() error {
|
||||
var err error
|
||||
reply.Index, reply.ACLs, err = state.ACLList()
|
||||
|
|
137
consul/fsm.go
137
consul/fsm.go
|
@ -136,9 +136,9 @@ func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} {
|
|||
}
|
||||
|
||||
func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
|
||||
// Apply all updates in a single transaction
|
||||
if err := c.state.EnsureRegistration(index, req); err != nil {
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
|
||||
if err := c.stateNew.EnsureRegistration(index, req); err != nil {
|
||||
c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -154,17 +154,17 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
|||
|
||||
// Either remove the service entry or the whole node
|
||||
if req.ServiceID != "" {
|
||||
if err := c.state.DeleteNodeService(index, req.Node, req.ServiceID); err != nil {
|
||||
if err := c.stateNew.DeleteService(index, req.Node, req.ServiceID); err != nil {
|
||||
c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err)
|
||||
return err
|
||||
}
|
||||
} else if req.CheckID != "" {
|
||||
if err := c.state.DeleteNodeCheck(index, req.Node, req.CheckID); err != nil {
|
||||
if err := c.stateNew.DeleteCheck(index, req.Node, req.CheckID); err != nil {
|
||||
c.logger.Printf("[INFO] consul.fsm: DeleteNodeCheck failed: %v", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := c.state.DeleteNode(index, req.Node); err != nil {
|
||||
if err := c.stateNew.DeleteNode(index, req.Node); err != nil {
|
||||
c.logger.Printf("[INFO] consul.fsm: DeleteNode failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
@ -180,34 +180,34 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
|||
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now())
|
||||
switch req.Op {
|
||||
case structs.KVSSet:
|
||||
return c.state.KVSSet(index, &req.DirEnt)
|
||||
return c.stateNew.KVSSet(index, &req.DirEnt)
|
||||
case structs.KVSDelete:
|
||||
return c.state.KVSDelete(index, req.DirEnt.Key)
|
||||
return c.stateNew.KVSDelete(index, req.DirEnt.Key)
|
||||
case structs.KVSDeleteCAS:
|
||||
act, err := c.state.KVSDeleteCheckAndSet(index, req.DirEnt.Key, req.DirEnt.ModifyIndex)
|
||||
act, err := c.stateNew.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
case structs.KVSDeleteTree:
|
||||
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
|
||||
return c.stateNew.KVSDeleteTree(index, req.DirEnt.Key)
|
||||
case structs.KVSCAS:
|
||||
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
|
||||
act, err := c.stateNew.KVSSetCAS(index, &req.DirEnt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
case structs.KVSLock:
|
||||
act, err := c.state.KVSLock(index, &req.DirEnt)
|
||||
act, err := c.stateNew.KVSLock(index, &req.DirEnt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
case structs.KVSUnlock:
|
||||
act, err := c.state.KVSUnlock(index, &req.DirEnt)
|
||||
act, err := c.stateNew.KVSUnlock(index, &req.DirEnt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
|
@ -228,13 +228,13 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
|
|||
defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now())
|
||||
switch req.Op {
|
||||
case structs.SessionCreate:
|
||||
if err := c.state.SessionCreate(index, &req.Session); err != nil {
|
||||
if err := c.stateNew.SessionCreate(index, &req.Session); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return req.Session.ID
|
||||
}
|
||||
case structs.SessionDestroy:
|
||||
return c.state.SessionDestroy(index, req.Session.ID)
|
||||
return c.stateNew.SessionDestroy(index, req.Session.ID)
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid Session operation '%s'", req.Op)
|
||||
|
@ -270,7 +270,7 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
|
|||
defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now())
|
||||
switch req.Op {
|
||||
case structs.TombstoneReap:
|
||||
return c.state.ReapTombstones(req.ReapIndex)
|
||||
return c.stateNew.ReapTombstones(req.ReapIndex)
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op)
|
||||
|
@ -300,12 +300,12 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
}
|
||||
|
||||
// Create a new state store
|
||||
state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
|
||||
store, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.state.Close()
|
||||
c.state = state
|
||||
c.state = store
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(old, msgpackHandle)
|
||||
|
@ -341,7 +341,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.state.KVSRestore(&req); err != nil {
|
||||
if err := c.stateNew.KVSRestore(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -350,7 +350,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.state.SessionRestore(&req); err != nil {
|
||||
if err := c.stateNew.SessionRestore(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -368,7 +368,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.state.TombstoneRestore(&req); err != nil {
|
||||
|
||||
// For historical reasons, these are serialized in the
|
||||
// snapshots as KV entries. We want to keep the snapshot
|
||||
// format compatible with pre-0.6 versions for now.
|
||||
stone := &state.Tombstone{
|
||||
Key: req.Key,
|
||||
Index: req.ModifyIndex,
|
||||
}
|
||||
if err := c.stateNew.TombstoneRestore(stone); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -387,7 +395,7 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
|
||||
// Write the header
|
||||
header := snapshotHeader{
|
||||
LastIndex: s.state.LastIndex(),
|
||||
LastIndex: s.stateNew.LastIndex(),
|
||||
}
|
||||
if err := encoder.Encode(&header); err != nil {
|
||||
sink.Cancel()
|
||||
|
@ -423,8 +431,12 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
|
||||
func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
|
||||
// Get all the nodes
|
||||
nodes := s.state.Nodes()
|
||||
nodes, err := s.stateNew.NodeDump()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Register each node
|
||||
var req structs.RegisterRequest
|
||||
|
@ -441,8 +453,11 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
|||
}
|
||||
|
||||
// Register each service this node has
|
||||
services := s.state.NodeServices(nodes[i].Node)
|
||||
for _, srv := range services.Services {
|
||||
services, err := s.stateNew.ServiceDump(nodes[i].Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, srv := range services {
|
||||
req.Service = srv
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
|
@ -452,7 +467,10 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
|||
|
||||
// Register each check this node has
|
||||
req.Service = nil
|
||||
checks := s.state.NodeChecks(nodes[i].Node)
|
||||
checks, err := s.stateNew.CheckDump(nodes[i].Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, check := range checks {
|
||||
req.Check = check
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
|
@ -466,7 +484,7 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
|||
|
||||
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
sessions, err := s.state.SessionList()
|
||||
sessions, err := s.stateNew.SessionDump()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -482,7 +500,7 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
|
|||
|
||||
func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
acls, err := s.stateNew.ACLList()
|
||||
acls, err := s.stateNew.ACLDump()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -498,58 +516,47 @@ func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
|
|||
|
||||
func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
streamCh := make(chan interface{}, 256)
|
||||
errorCh := make(chan error)
|
||||
go func() {
|
||||
if err := s.state.KVSDump(streamCh); err != nil {
|
||||
errorCh <- err
|
||||
}
|
||||
}()
|
||||
entries, err := s.stateNew.KVSDump()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case raw := <-streamCh:
|
||||
if raw == nil {
|
||||
return nil
|
||||
}
|
||||
sink.Write([]byte{byte(structs.KVSRequestType)})
|
||||
if err := encoder.Encode(raw); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case err := <-errorCh:
|
||||
for _, e := range entries {
|
||||
sink.Write([]byte{byte(structs.KVSRequestType)})
|
||||
if err := encoder.Encode(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
streamCh := make(chan interface{}, 256)
|
||||
errorCh := make(chan error)
|
||||
go func() {
|
||||
if err := s.state.TombstoneDump(streamCh); err != nil {
|
||||
errorCh <- err
|
||||
stones, err := s.stateNew.TombstoneDump()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, s := range stones {
|
||||
sink.Write([]byte{byte(structs.TombstoneRequestType)})
|
||||
|
||||
// For historical reasons, these are serialized in the snapshots
|
||||
// as KV entries. We want to keep the snapshot format compatible
|
||||
// with pre-0.6 versions for now.
|
||||
fake := &structs.DirEntry{
|
||||
Key: s.Key,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: s.Index,
|
||||
},
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case raw := <-streamCh:
|
||||
if raw == nil {
|
||||
return nil
|
||||
}
|
||||
sink.Write([]byte{byte(structs.TombstoneRequestType)})
|
||||
if err := encoder.Encode(raw); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case err := <-errorCh:
|
||||
if err := encoder.Encode(fake); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *consulSnapshot) Release() {
|
||||
s.state.Close()
|
||||
s.stateNew.Close()
|
||||
}
|
||||
|
|
|
@ -400,7 +400,7 @@ RUN_QUERY:
|
|||
|
||||
// TODO(slackpad)
|
||||
func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
|
||||
watch state.WatchManager, run func() error) error {
|
||||
watch state.Watch, run func() error) error {
|
||||
var timeout *time.Timer
|
||||
var notifyCh chan struct{}
|
||||
|
||||
|
@ -409,9 +409,9 @@ func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *stru
|
|||
goto RUN_QUERY
|
||||
}
|
||||
|
||||
// Make sure a watch manager was given if we were asked to block.
|
||||
// Make sure a watch was given if we were asked to block.
|
||||
if watch == nil {
|
||||
panic("no watch manager given for blocking query")
|
||||
panic("no watch given for blocking query")
|
||||
}
|
||||
|
||||
// Restrict the max query time, and ensure there is always one.
|
||||
|
@ -433,13 +433,13 @@ func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *stru
|
|||
// Ensure we tear down any watches on return.
|
||||
defer func() {
|
||||
timeout.Stop()
|
||||
watch.Stop(notifyCh)
|
||||
watch.Clear(notifyCh)
|
||||
}()
|
||||
|
||||
REGISTER_NOTIFY:
|
||||
// Register the notification channel. This may be done multiple times if
|
||||
// we haven't reached the target wait index.
|
||||
watch.Start(notifyCh)
|
||||
watch.Wait(notifyCh)
|
||||
|
||||
RUN_QUERY:
|
||||
// Update the query metadata.
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Delay is used to mark certain locks as unacquirable. When a lock is
|
||||
// forcefully released (failing health check, destroyed session, etc.), it is
|
||||
// subject to the LockDelay impossed by the session. This prevents another
|
||||
// session from acquiring the lock for some period of time as a protection
|
||||
// against split-brains. This is inspired by the lock-delay in Chubby. Because
|
||||
// this relies on wall-time, we cannot assume all peers perceive time as flowing
|
||||
// uniformly. This means KVSLock MUST ignore lockDelay, since the lockDelay may
|
||||
// have expired on the leader, but not on the follower. Rejecting the lock could
|
||||
// result in inconsistencies in the FSMs due to the rate time progresses. Instead,
|
||||
// only the opinion of the leader is respected, and the Raft log is never
|
||||
// questioned.
|
||||
type Delay struct {
|
||||
// delay has the set of active delay expiration times, organized by key.
|
||||
delay map[string]time.Time
|
||||
|
||||
// lock protects the delay map.
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewDelay returns a new delay manager.
|
||||
func NewDelay() *Delay {
|
||||
return &Delay{delay: make(map[string]time.Time)}
|
||||
}
|
||||
|
||||
// GetExpiration returns the expiration time of a key lock delay. This must be
|
||||
// checked on the leader node, and not in KVSLock due to the variability of
|
||||
// clocks.
|
||||
func (d *Delay) GetExpiration(key string) time.Time {
|
||||
d.lock.RLock()
|
||||
expires := d.delay[key]
|
||||
d.lock.RUnlock()
|
||||
return expires
|
||||
}
|
||||
|
||||
// SetExpiration sets the expiration time for the lock delay to the given
|
||||
// delay from the given now time.
|
||||
func (d *Delay) SetExpiration(key string, now time.Time, delay time.Duration) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
|
||||
d.delay[key] = now.Add(delay)
|
||||
time.AfterFunc(delay, func() {
|
||||
d.lock.Lock()
|
||||
delete(d.delay, key)
|
||||
d.lock.Unlock()
|
||||
})
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
// tombstone is the internal type used to track tombstones.
|
||||
type Tombstone struct {
|
||||
Key string
|
||||
Index uint64
|
||||
}
|
||||
|
||||
// Graveyard manages a set of tombstones for a table. This is just used for
|
||||
// KVS right now but we've broken it out for other table types later.
|
||||
type Graveyard struct {
|
||||
Table string
|
||||
}
|
||||
|
||||
// NewGraveyard returns a new graveyard.
|
||||
func NewGraveyard(table string) *Graveyard {
|
||||
return &Graveyard{Table: "tombstones_" + table}
|
||||
}
|
||||
|
||||
// InsertTxn adds a new tombstone.
|
||||
func (g *Graveyard) InsertTxn(tx *memdb.Txn, context string, idx uint64) error {
|
||||
stone := &Tombstone{Key: context, Index: idx}
|
||||
if err := tx.Insert(g.Table, stone); err != nil {
|
||||
return fmt.Errorf("failed inserting tombstone: %s", err)
|
||||
}
|
||||
|
||||
if err := tx.Insert("index", &IndexEntry{g.Table, idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMaxIndexTxn returns the highest index tombstone whose key matches the
|
||||
// given context, using a prefix match.
|
||||
func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, context string) (uint64, error) {
|
||||
stones, err := tx.Get(g.Table, "id", context)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed querying tombstones: %s", err)
|
||||
}
|
||||
|
||||
var lindex uint64
|
||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||
r := stone.(*Tombstone)
|
||||
if r.Index > lindex {
|
||||
lindex = r.Index
|
||||
}
|
||||
}
|
||||
return lindex, nil
|
||||
}
|
||||
|
||||
// DumpTxn returns all the tombstones.
|
||||
func (g *Graveyard) DumpTxn(tx *memdb.Txn) ([]*Tombstone, error) {
|
||||
stones, err := tx.Get(g.Table, "id", "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed querying tombstones: %s", err)
|
||||
}
|
||||
|
||||
var dump []*Tombstone
|
||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||
dump = append(dump, stone.(*Tombstone))
|
||||
}
|
||||
return dump, nil
|
||||
}
|
||||
|
||||
// RestoreTxn is used when restoring from a snapshot. For general inserts, use
|
||||
// InsertTxn.
|
||||
func (g *Graveyard) RestoreTxn(tx *memdb.Txn, stone *Tombstone) error {
|
||||
if err := tx.Insert(g.Table, stone); err != nil {
|
||||
return fmt.Errorf("failed inserting tombstone: %s", err)
|
||||
}
|
||||
|
||||
if err := indexUpdateMaxTxn(tx, stone.Index, g.Table); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReapTxn cleans out all tombstones whose index values are less than or equal
|
||||
// to the given idx. This prevents unbounded storage growth of the tombstones.
|
||||
func (g *Graveyard) ReapTxn(tx *memdb.Txn, idx uint64) error {
|
||||
// This does a full table scan since we currently can't index on a
|
||||
// numeric value. Since this is all in-memory and done infrequently
|
||||
// this pretty reasonable.
|
||||
stones, err := tx.Get(g.Table, "id", "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed querying tombstones: %s", err)
|
||||
}
|
||||
|
||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||
if stone.(*Tombstone).Index <= idx {
|
||||
if err := tx.Delete(g.Table, stone); err != nil {
|
||||
return fmt.Errorf("failed deleting tombstone: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -25,7 +25,7 @@ func stateStoreSchema() *memdb.DBSchema {
|
|||
servicesTableSchema,
|
||||
checksTableSchema,
|
||||
kvsTableSchema,
|
||||
tombstonesTableSchema,
|
||||
func() *memdb.TableSchema { return tombstonesTableSchema("kvs") },
|
||||
sessionsTableSchema,
|
||||
sessionChecksTableSchema,
|
||||
aclsTableSchema,
|
||||
|
@ -177,7 +177,6 @@ func checksTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
// TODO(slackpad): This one is new, where is it used?
|
||||
"node_service": &memdb.IndexSchema{
|
||||
Name: "node_service",
|
||||
AllowMissing: true,
|
||||
|
@ -231,11 +230,11 @@ func kvsTableSchema() *memdb.TableSchema {
|
|||
}
|
||||
|
||||
// tombstonesTableSchema returns a new table schema used for
|
||||
// storing tombstones during kvs delete operations to prevent
|
||||
// the index from sliding backwards.
|
||||
func tombstonesTableSchema() *memdb.TableSchema {
|
||||
// storing tombstones during the given table's delete operations
|
||||
// to prevent the index from sliding backwards.
|
||||
func tombstonesTableSchema(table string) *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: "tombstones",
|
||||
Name: "tombstones_" + table,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": &memdb.IndexSchema{
|
||||
Name: "id",
|
||||
|
@ -305,21 +304,10 @@ func sessionChecksTableSchema() *memdb.TableSchema {
|
|||
},
|
||||
},
|
||||
},
|
||||
// TODO(slackpad): Where did these come from?
|
||||
"session": &memdb.IndexSchema{
|
||||
Name: "session",
|
||||
"node_check": &memdb.IndexSchema{
|
||||
Name: "node_check",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Session",
|
||||
Lowercase: false,
|
||||
},
|
||||
},
|
||||
// TODO(slackpad): Should this be called node_session?
|
||||
"node": &memdb.IndexSchema{
|
||||
Name: "node",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
|
@ -327,12 +315,21 @@ func sessionChecksTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Session",
|
||||
Lowercase: false,
|
||||
Field: "CheckID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"session": &memdb.IndexSchema{
|
||||
Name: "session",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Session",
|
||||
Lowercase: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1140,7 +1140,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
|
|||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Deleting a nonexistent key should be idempotent and note return an
|
||||
// Deleting a nonexistent key should be idempotent and not return an
|
||||
// error
|
||||
if err := s.KVSDelete(4, "foo"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -1519,7 +1519,7 @@ func TestStateStore_SessionList(t *testing.T) {
|
|||
testRegisterNode(t, s, 3, "node3")
|
||||
|
||||
// Create some sessions in the state store
|
||||
sessions := []*structs.Session{
|
||||
sessions := structs.Sessions{
|
||||
&structs.Session{
|
||||
ID: "session1",
|
||||
Node: "node1",
|
||||
|
@ -1569,7 +1569,7 @@ func TestStateStore_NodeSessions(t *testing.T) {
|
|||
testRegisterNode(t, s, 2, "node2")
|
||||
|
||||
// Register some sessions with the nodes
|
||||
sessions1 := []*structs.Session{
|
||||
sessions1 := structs.Sessions{
|
||||
&structs.Session{
|
||||
ID: "session1",
|
||||
Node: "node1",
|
||||
|
@ -1758,7 +1758,7 @@ func TestStateStore_ACLList(t *testing.T) {
|
|||
}
|
||||
|
||||
// Insert some ACLs
|
||||
acls := []*structs.ACL{
|
||||
acls := structs.ACLs{
|
||||
&structs.ACL{
|
||||
ID: "acl1",
|
||||
Type: structs.ACLTypeClient,
|
||||
|
@ -1839,7 +1839,7 @@ func TestStateStore_ACL_Watches(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
ch := make(chan struct{})
|
||||
|
||||
s.GetWatchManager("acls").Start(ch)
|
||||
s.GetTableWatch("acls").Wait(ch)
|
||||
go func() {
|
||||
if err := s.ACLSet(1, &structs.ACL{ID: "acl1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -1851,7 +1851,7 @@ func TestStateStore_ACL_Watches(t *testing.T) {
|
|||
t.Fatalf("watch was not notified")
|
||||
}
|
||||
|
||||
s.GetWatchManager("acls").Start(ch)
|
||||
s.GetTableWatch("acls").Wait(ch)
|
||||
go func() {
|
||||
if err := s.ACLDelete(2, "acl1"); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -1863,7 +1863,7 @@ func TestStateStore_ACL_Watches(t *testing.T) {
|
|||
t.Fatalf("watch was not notified")
|
||||
}
|
||||
|
||||
s.GetWatchManager("acls").Start(ch)
|
||||
s.GetTableWatch("acls").Wait(ch)
|
||||
go func() {
|
||||
if err := s.ACLRestore(&structs.ACL{ID: "acl1"}); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
|
|
@ -1,35 +1,132 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"sync"
|
||||
|
||||
"github.com/armon/go-radix"
|
||||
)
|
||||
|
||||
type WatchManager interface {
|
||||
Start(notifyCh chan struct{})
|
||||
Stop(notifyCh chan struct{})
|
||||
Notify()
|
||||
// Watch is the external interface that's common to all the different flavors.
|
||||
type Watch interface {
|
||||
// Wait registers the given channel and calls it back when the watch
|
||||
// fires.
|
||||
Wait(notifyCh chan struct{})
|
||||
|
||||
// Clear deregisters the given channel.
|
||||
Clear(notifyCh chan struct{})
|
||||
}
|
||||
|
||||
// FullTableWatch implements a single notify group for a table.
|
||||
type FullTableWatch struct {
|
||||
notify NotifyGroup
|
||||
group NotifyGroup
|
||||
}
|
||||
|
||||
func (w *FullTableWatch) Start(notifyCh chan struct{}) {
|
||||
w.notify.Wait(notifyCh)
|
||||
// NewFullTableWatch returns a new full table watch.
|
||||
func NewFullTableWatch() *FullTableWatch {
|
||||
return &FullTableWatch{}
|
||||
}
|
||||
|
||||
func (w *FullTableWatch) Stop(notifyCh chan struct{}) {
|
||||
w.notify.Clear(notifyCh)
|
||||
// See Watch.
|
||||
func (w *FullTableWatch) Wait(notifyCh chan struct{}) {
|
||||
w.group.Wait(notifyCh)
|
||||
}
|
||||
|
||||
// See Watch.
|
||||
func (w *FullTableWatch) Clear(notifyCh chan struct{}) {
|
||||
w.group.Clear(notifyCh)
|
||||
}
|
||||
|
||||
// Notify wakes up all the watchers registered for this table.
|
||||
func (w *FullTableWatch) Notify() {
|
||||
w.notify.Notify()
|
||||
w.group.Notify()
|
||||
}
|
||||
|
||||
func newWatchManagers(schema *memdb.DBSchema) (map[string]WatchManager, error) {
|
||||
watches := make(map[string]WatchManager)
|
||||
for table, _ := range schema.Tables {
|
||||
watches[table] = &FullTableWatch{}
|
||||
}
|
||||
return watches, nil
|
||||
// DumbWatchManager is a wrapper that allows nested code to arm full table
|
||||
// watches multiple times but fire them only once.
|
||||
type DumbWatchManager struct {
|
||||
tableWatches map[string]*FullTableWatch
|
||||
armed map[string]bool
|
||||
}
|
||||
|
||||
// NewDumbWatchManager returns a new dumb watch manager.
|
||||
func NewDumbWatchManager(tableWatches map[string]*FullTableWatch) *DumbWatchManager {
|
||||
return &DumbWatchManager{
|
||||
tableWatches: tableWatches,
|
||||
armed: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// Arm arms the given table's watch.
|
||||
func (d *DumbWatchManager) Arm(table string) {
|
||||
if _, ok := d.armed[table]; !ok {
|
||||
d.armed[table] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Notify fires watches for all the armed tables.
|
||||
func (d *DumbWatchManager) Notify() {
|
||||
for table, _ := range d.armed {
|
||||
d.tableWatches[table].Notify()
|
||||
}
|
||||
}
|
||||
|
||||
// PrefixWatch maintains a notify group for each prefix, allowing for much more
|
||||
// fine-grained watches.
|
||||
type PrefixWatch struct {
|
||||
// watches has the set of notify groups, organized by prefix.
|
||||
watches *radix.Tree
|
||||
|
||||
// lock protects the watches tree.
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewPrefixWatch returns a new prefix watch.
|
||||
func NewPrefixWatch() *PrefixWatch {
|
||||
return &PrefixWatch{watches: radix.New()}
|
||||
}
|
||||
|
||||
// GetSubwatch returns the notify group for the given prefix.
|
||||
func (w *PrefixWatch) GetSubwatch(prefix string) *NotifyGroup {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
if raw, ok := w.watches.Get(prefix); ok {
|
||||
return raw.(*NotifyGroup)
|
||||
}
|
||||
|
||||
group := &NotifyGroup{}
|
||||
w.watches.Insert(prefix, group)
|
||||
return group
|
||||
}
|
||||
|
||||
// Notify wakes up all the watchers associated with the given prefix. If subtree
|
||||
// is true then we will also notify all the tree under the prefix, such as when
|
||||
// a key is being deleted.
|
||||
func (w *PrefixWatch) Notify(prefix string, subtree bool) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
var cleanup []string
|
||||
fn := func(k string, v interface{}) bool {
|
||||
group := v.(*NotifyGroup)
|
||||
group.Notify()
|
||||
if k != "" {
|
||||
cleanup = append(cleanup, k)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Invoke any watcher on the path downward to the key.
|
||||
w.watches.WalkPath(prefix, fn)
|
||||
|
||||
// If the entire prefix may be affected (e.g. delete tree),
|
||||
// invoke the entire prefix.
|
||||
if subtree {
|
||||
w.watches.WalkPrefix(prefix, fn)
|
||||
}
|
||||
|
||||
// Delete the old notify groups.
|
||||
for i := len(cleanup) - 1; i >= 0; i-- {
|
||||
w.watches.Delete(cleanup[i])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -356,6 +356,22 @@ type DirEntry struct {
|
|||
|
||||
RaftIndex
|
||||
}
|
||||
|
||||
// Returns a clone of the given directory entry.
|
||||
func (d *DirEntry) Clone() *DirEntry {
|
||||
return &DirEntry{
|
||||
LockIndex: d.LockIndex,
|
||||
Key: d.Key,
|
||||
Flags: d.Flags,
|
||||
Value: d.Value,
|
||||
Session: d.Session,
|
||||
RaftIndex: RaftIndex{
|
||||
CreateIndex: d.CreateIndex,
|
||||
ModifyIndex: d.ModifyIndex,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type DirEntries []*DirEntry
|
||||
|
||||
type KVSOp string
|
||||
|
|
Loading…
Reference in New Issue