mirror of
https://github.com/status-im/consul.git
synced 2025-01-26 21:51:39 +00:00
consul: First pass to reduce KV watch cost
This commit is contained in:
parent
e86ed70c70
commit
3096d6c749
@ -11,6 +11,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-radix"
|
||||
"github.com/armon/gomdb"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
@ -63,6 +64,14 @@ type StateStore struct {
|
||||
watch map[*MDBTable]*NotifyGroup
|
||||
queryTables map[string]MDBTables
|
||||
|
||||
// kvWatch is a more optimized way of watching for KV changes.
|
||||
// Instead of just using a NotifyGroup for the entire table,
|
||||
// a watcher is instantiated on a given prefix. When a change happens,
|
||||
// only the relevant watchers are woken up. This reduces the cost of
|
||||
// watching for KV changes.
|
||||
kvWatch *radix.Tree
|
||||
kvWatchLock sync.Mutex
|
||||
|
||||
// lockDelay is used to mark certain locks as unacquirable.
|
||||
// When a lock is forcefully released (failing health
|
||||
// check, destroyed session, etc), it is subject to the LockDelay
|
||||
@ -131,6 +140,7 @@ func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*Stat
|
||||
path: path,
|
||||
env: env,
|
||||
watch: make(map[*MDBTable]*NotifyGroup),
|
||||
kvWatch: radix.New(),
|
||||
lockDelay: make(map[string]time.Time),
|
||||
gc: gc,
|
||||
}
|
||||
@ -414,6 +424,58 @@ func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// WatchKV is used to subscribe a channel to changes in KV data
|
||||
func (s *StateStore) WatchKV(prefix string, notify chan struct{}) {
|
||||
s.kvWatchLock.Lock()
|
||||
defer s.kvWatchLock.Unlock()
|
||||
|
||||
// Check for an existing notify group
|
||||
if raw, ok := s.kvWatch.Get(prefix); ok {
|
||||
grp := raw.(*NotifyGroup)
|
||||
grp.Wait(notify)
|
||||
return
|
||||
}
|
||||
|
||||
// Create new notify group
|
||||
grp := &NotifyGroup{}
|
||||
grp.Wait(notify)
|
||||
s.kvWatch.Insert(prefix, grp)
|
||||
}
|
||||
|
||||
// notifyKV is used to notify any KV listeners of a change
|
||||
// on a prefix
|
||||
func (s *StateStore) notifyKV(path string, prefix bool) {
|
||||
// Backwards compatibility for old listeners
|
||||
s.watch[s.kvsTable].Notify()
|
||||
|
||||
s.kvWatchLock.Lock()
|
||||
defer s.kvWatchLock.Unlock()
|
||||
|
||||
var toDelete []string
|
||||
fn := func(s string, v interface{}) bool {
|
||||
group := v.(*NotifyGroup)
|
||||
group.Notify()
|
||||
if s != "" {
|
||||
toDelete = append(toDelete, s)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Invoke any watcher on the path downward to the key.
|
||||
s.kvWatch.WalkPath(path, fn)
|
||||
|
||||
// If the entire prefix may be affected (e.g. delete tree),
|
||||
// invoke the entire prefix
|
||||
if prefix {
|
||||
s.kvWatch.WalkPrefix(path, fn)
|
||||
}
|
||||
|
||||
// Delete the old watch groups
|
||||
for i := len(toDelete) - 1; i >= 0; i-- {
|
||||
s.kvWatch.Delete(toDelete[i])
|
||||
}
|
||||
}
|
||||
|
||||
// QueryTables returns the Tables that are queried for a given query
|
||||
func (s *StateStore) QueryTables(q string) MDBTables {
|
||||
return s.queryTables[q]
|
||||
@ -1298,7 +1360,17 @@ func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex
|
||||
return err
|
||||
}
|
||||
tx.Defer(func() {
|
||||
s.watch[s.kvsTable].Notify()
|
||||
// Trigger the most fine grained notifications if possible
|
||||
switch {
|
||||
case len(parts) == 0:
|
||||
s.notifyKV("", true)
|
||||
case tableIndex == "id":
|
||||
s.notifyKV(parts[0], false)
|
||||
case tableIndex == "id_prefix":
|
||||
s.notifyKV(parts[0], true)
|
||||
default:
|
||||
s.notifyKV("", true)
|
||||
}
|
||||
if s.gc != nil {
|
||||
// If GC is configured, then we hint that this index
|
||||
// required expiration.
|
||||
@ -1426,7 +1498,7 @@ func (s *StateStore) kvsSet(
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return false, err
|
||||
}
|
||||
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
|
||||
tx.Defer(func() { s.notifyKV(d.Key, false) })
|
||||
return true, tx.Commit()
|
||||
}
|
||||
|
||||
@ -1785,12 +1857,12 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
|
||||
s.lockDelayLock.Unlock()
|
||||
})
|
||||
}
|
||||
tx.Defer(func() { s.notifyKV(kv.Key, false) })
|
||||
}
|
||||
if len(pairs) > 0 {
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return err
|
||||
}
|
||||
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user