diff --git a/agent/proxy/manager.go b/agent/proxy/manager.go index dedc6f7378..4e45d22a83 100644 --- a/agent/proxy/manager.go +++ b/agent/proxy/manager.go @@ -6,12 +6,26 @@ import ( "os" "os/exec" "sync" + "time" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-multierror" ) +const ( + // ManagerCoalescePeriod and ManagerQuiescentPeriod relate to how + // notifications in updates from the local state are colaesced to prevent + // lots of churn in the manager. + // + // When the local state updates, the manager will wait for quiescence. + // For each update, the quiscence timer is reset. If the coalesce period + // is reached, the manager will update proxies regardless of the frequent + // changes. Then the whole cycle resets. + ManagerCoalescePeriod = 5 * time.Second + ManagerQuiescentPeriod = 500 * time.Millisecond +) + // Manager starts, stops, snapshots, and restores managed proxies. // // The manager will not start or stop any processes until Start is called. @@ -26,11 +40,9 @@ import ( // and state updates may occur while the Manger is syncing. This is okay, // since a change notification will be queued to trigger another sync. // -// NOTE(mitchellh): Change notifications are not coalesced currently. Under -// conditions where managed proxy configurations are changing in a hot -// loop, it is possible for the manager to constantly attempt to sync. This -// is unlikely, but its also easy to introduce basic coalescing (even over -// millisecond intervals) to prevent total waste compute cycles. +// The change notifications from the local state are coalesced (see +// ManagerCoalescePeriod) so that frequent changes within the local state +// do not trigger dozens of proxy resyncs. type Manager struct { // State is the local state that is the source of truth for all // configured managed proxies. @@ -41,6 +53,13 @@ type Manager struct { // implementation type. Logger *log.Logger + // CoalescePeriod and QuiescencePeriod control the timers for coalescing + // updates from the local state. See the defaults at the top of this + // file for more documentation. These will be set to those defaults + // by NewManager. + CoalescePeriod time.Duration + QuiescentPeriod time.Duration + // lock is held while reading/writing any internal state of the manager. // cond is a condition variable on lock that is broadcasted for runState // changes. @@ -55,22 +74,24 @@ type Manager struct { proxies map[string]Proxy } -// defaultLogger is the defaultLogger for NewManager so there it is never nil -var defaultLogger = log.New(os.Stderr, "", log.LstdFlags) - // NewManager initializes a Manager. After initialization, the exported // fields should be configured as desired. To start the Manager, execute // Run in a goroutine. func NewManager() *Manager { var lock sync.Mutex return &Manager{ - Logger: defaultLogger, - lock: &lock, - cond: sync.NewCond(&lock), - proxies: make(map[string]Proxy), + Logger: defaultLogger, + CoalescePeriod: ManagerCoalescePeriod, + QuiescentPeriod: ManagerQuiescentPeriod, + lock: &lock, + cond: sync.NewCond(&lock), + proxies: make(map[string]Proxy), } } +// defaultLogger is the defaultLogger for NewManager so there it is never nil +var defaultLogger = log.New(os.Stderr, "", log.LstdFlags) + // managerRunState is the state of the Manager. // // This is a basic state machine with the following transitions: @@ -193,19 +214,43 @@ func (m *Manager) Run() { defer m.State.StopNotifyProxy(notifyCh) m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started") +SYNC: for { // Sync first, before waiting on further notifications so that // we can start with a known-current state. m.sync() - select { - case <-notifyCh: - // Changes exit select so we can reloop and reconfigure proxies + // Note for these variables we don't use a time.Timer because both + // periods are relatively short anyways so they end up being eligible + // for GC very quickly, so overhead is not a concern. + var quiescent, quantum <-chan time.Time - case <-stopCh: - // Stop immediately, no cleanup - m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager") - return + // Start a loop waiting for events from the local state store. This + // loops rather than just `select` so we can coalesce many state + // updates over a period of time. + for { + select { + case <-notifyCh: + // If this is our first notification since the last sync, + // reset the quantum timer which is the max time we'll wait. + if quantum == nil { + quantum = time.After(m.CoalescePeriod) + } + + // Always reset the quiescent timer + quiescent = time.After(m.QuiescentPeriod) + + case <-quantum: + continue SYNC + + case <-quiescent: + continue SYNC + + case <-stopCh: + // Stop immediately, no cleanup + m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager") + return + } } } } diff --git a/agent/proxy/manager_test.go b/agent/proxy/manager_test.go index 97086d4911..4ee84f56aa 100644 --- a/agent/proxy/manager_test.go +++ b/agent/proxy/manager_test.go @@ -17,7 +17,7 @@ func TestManagerClose_noRun(t *testing.T) { t.Parallel() // Really we're testing that it doesn't deadlock here. - m := NewManager() + m := testManager(t) require.NoError(t, m.Close()) // Close again for sanity @@ -30,7 +30,7 @@ func TestManagerRun_initialSync(t *testing.T) { t.Parallel() state := local.TestState(t) - m := NewManager() + m := testManager(t) m.State = state defer m.Kill() @@ -57,7 +57,7 @@ func TestManagerRun_syncNew(t *testing.T) { t.Parallel() state := local.TestState(t) - m := NewManager() + m := testManager(t) m.State = state defer m.Kill() @@ -99,7 +99,7 @@ func TestManagerRun_syncDelete(t *testing.T) { t.Parallel() state := local.TestState(t) - m := NewManager() + m := testManager(t) m.State = state defer m.Kill() @@ -138,7 +138,7 @@ func TestManagerRun_syncUpdate(t *testing.T) { t.Parallel() state := local.TestState(t) - m := NewManager() + m := testManager(t) m.State = state defer m.Kill() @@ -181,6 +181,16 @@ func TestManagerRun_syncUpdate(t *testing.T) { }) } +func testManager(t *testing.T) *Manager { + m := NewManager() + + // Set these periods low to speed up tests + m.CoalescePeriod = 1 * time.Millisecond + m.QuiescentPeriod = 1 * time.Millisecond + + return m +} + // testStateProxy registers a proxy with the given local state and the command // (expected to be from the helperProcess function call). It returns the // ID for deregistration.