mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 06:16:08 +00:00
agent/proxy: implement periodic snapshotting in the manager
This commit is contained in:
parent
13ff115436
commit
e0bbe66427
@ -25,6 +25,11 @@ const (
|
|||||||
// changes. Then the whole cycle resets.
|
// changes. Then the whole cycle resets.
|
||||||
ManagerCoalescePeriod = 5 * time.Second
|
ManagerCoalescePeriod = 5 * time.Second
|
||||||
ManagerQuiescentPeriod = 500 * time.Millisecond
|
ManagerQuiescentPeriod = 500 * time.Millisecond
|
||||||
|
|
||||||
|
// ManagerSnapshotPeriod is the interval that snapshots are taken.
|
||||||
|
// The last snapshot state is preserved and if it matches a file isn't
|
||||||
|
// written, so its safe for this to be reasonably frequent.
|
||||||
|
ManagerSnapshotPeriod = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Manager starts, stops, snapshots, and restores managed proxies.
|
// Manager starts, stops, snapshots, and restores managed proxies.
|
||||||
@ -64,9 +69,14 @@ type Manager struct {
|
|||||||
//
|
//
|
||||||
DataDir string
|
DataDir string
|
||||||
|
|
||||||
// SnapshotDir is the path to the directory where snapshots will
|
// SnapshotPeriod is the duration between snapshots. This can be set
|
||||||
// be written
|
// relatively low to ensure accuracy, because if the new snapshot matches
|
||||||
SnapshotDir string
|
// the last snapshot taken, no file will be written. Therefore, setting
|
||||||
|
// this low causes only slight CPU/memory usage but doesn't result in
|
||||||
|
// disk IO. If this isn't set, ManagerSnapshotPeriod will be the default.
|
||||||
|
//
|
||||||
|
// This only has an effect if snapshots are enabled (DataDir is set).
|
||||||
|
SnapshotPeriod time.Duration
|
||||||
|
|
||||||
// CoalescePeriod and QuiescencePeriod control the timers for coalescing
|
// CoalescePeriod and QuiescencePeriod control the timers for coalescing
|
||||||
// updates from the local state. See the defaults at the top of this
|
// updates from the local state. See the defaults at the top of this
|
||||||
@ -86,7 +96,8 @@ type Manager struct {
|
|||||||
// for changes to this value.
|
// for changes to this value.
|
||||||
runState managerRunState
|
runState managerRunState
|
||||||
|
|
||||||
proxies map[string]Proxy
|
proxies map[string]Proxy
|
||||||
|
lastSnapshot *snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager initializes a Manager. After initialization, the exported
|
// NewManager initializes a Manager. After initialization, the exported
|
||||||
@ -96,6 +107,7 @@ func NewManager() *Manager {
|
|||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
return &Manager{
|
return &Manager{
|
||||||
Logger: defaultLogger,
|
Logger: defaultLogger,
|
||||||
|
SnapshotPeriod: ManagerSnapshotPeriod,
|
||||||
CoalescePeriod: ManagerCoalescePeriod,
|
CoalescePeriod: ManagerCoalescePeriod,
|
||||||
QuiescentPeriod: ManagerQuiescentPeriod,
|
QuiescentPeriod: ManagerQuiescentPeriod,
|
||||||
lock: &lock,
|
lock: &lock,
|
||||||
@ -228,6 +240,12 @@ func (m *Manager) Run() {
|
|||||||
m.State.NotifyProxy(notifyCh)
|
m.State.NotifyProxy(notifyCh)
|
||||||
defer m.State.StopNotifyProxy(notifyCh)
|
defer m.State.StopNotifyProxy(notifyCh)
|
||||||
|
|
||||||
|
// Start the timer for snapshots. We don't use a ticker because disk
|
||||||
|
// IO can be slow and we don't want overlapping notifications. So we only
|
||||||
|
// reset the timer once the snapshot is complete rather than continously.
|
||||||
|
snapshotTimer := time.NewTimer(m.SnapshotPeriod)
|
||||||
|
defer snapshotTimer.Stop()
|
||||||
|
|
||||||
m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started")
|
m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started")
|
||||||
SYNC:
|
SYNC:
|
||||||
for {
|
for {
|
||||||
@ -261,6 +279,17 @@ SYNC:
|
|||||||
case <-quiescent:
|
case <-quiescent:
|
||||||
continue SYNC
|
continue SYNC
|
||||||
|
|
||||||
|
case <-snapshotTimer.C:
|
||||||
|
// Perform a snapshot
|
||||||
|
if path := m.SnapshotPath(); path != "" {
|
||||||
|
if err := m.snapshot(path, true); err != nil {
|
||||||
|
m.Logger.Printf("[WARN] agent/proxy: failed to snapshot state: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset
|
||||||
|
snapshotTimer.Reset(m.SnapshotPeriod)
|
||||||
|
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
// Stop immediately, no cleanup
|
// Stop immediately, no cleanup
|
||||||
m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager")
|
m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager")
|
||||||
@ -342,10 +371,22 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
|
|||||||
if mp == nil || mp.Proxy == nil {
|
if mp == nil || mp.Proxy == nil {
|
||||||
return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field")
|
return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field")
|
||||||
}
|
}
|
||||||
|
|
||||||
p := mp.Proxy
|
p := mp.Proxy
|
||||||
switch p.ExecMode {
|
|
||||||
case structs.ProxyExecModeDaemon:
|
// We reuse the service ID a few times
|
||||||
|
id := p.ProxyService.ID
|
||||||
|
|
||||||
|
// Create the Proxy. We could just as easily switch on p.ExecMode
|
||||||
|
// but I wanted there to be only location where ExecMode => Proxy so
|
||||||
|
// it lowers the chance that is wrong.
|
||||||
|
proxy, err := m.newProxyFromMode(p.ExecMode, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Depending on the proxy type we configure the rest from our ManagedProxy
|
||||||
|
switch proxy := proxy.(type) {
|
||||||
|
case *Daemon:
|
||||||
command := p.Command
|
command := p.Command
|
||||||
|
|
||||||
// This should never happen since validation should happen upstream
|
// This should never happen since validation should happen upstream
|
||||||
@ -354,9 +395,6 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
|
|||||||
return nil, fmt.Errorf("daemon mode managed proxy requires command")
|
return nil, fmt.Errorf("daemon mode managed proxy requires command")
|
||||||
}
|
}
|
||||||
|
|
||||||
// We reuse the service ID a few times
|
|
||||||
id := p.ProxyService.ID
|
|
||||||
|
|
||||||
// Build the command to execute.
|
// Build the command to execute.
|
||||||
var cmd exec.Cmd
|
var cmd exec.Cmd
|
||||||
cmd.Path = command[0]
|
cmd.Path = command[0]
|
||||||
@ -366,18 +404,31 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Build the daemon structure
|
// Build the daemon structure
|
||||||
return &Daemon{
|
proxy.Command = &cmd
|
||||||
Command: &cmd,
|
proxy.ProxyToken = mp.ProxyToken
|
||||||
ProxyToken: mp.ProxyToken,
|
return proxy, nil
|
||||||
Logger: m.Logger,
|
|
||||||
PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
|
|
||||||
}, nil
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
|
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newProxyFromMode just initializes the proxy structure from only the mode
|
||||||
|
// and the service ID. This is a shared method between newProxy and Restore
|
||||||
|
// so that we only have one location where we turn ExecMode into a Proxy.
|
||||||
|
func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy, error) {
|
||||||
|
switch mode {
|
||||||
|
case structs.ProxyExecModeDaemon:
|
||||||
|
return &Daemon{
|
||||||
|
Logger: m.Logger,
|
||||||
|
PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
|
||||||
|
}, nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported managed proxy type: %q", mode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// configureLogDir sets up the file descriptors to stdout/stderr so that
|
// configureLogDir sets up the file descriptors to stdout/stderr so that
|
||||||
// they log to the proper file path for the given service ID.
|
// they log to the proper file path for the given service ID.
|
||||||
func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error {
|
func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error {
|
||||||
|
@ -261,9 +261,98 @@ func TestManagerRun_daemonPid(t *testing.T) {
|
|||||||
require.NotEmpty(pidRaw)
|
require.NotEmpty(pidRaw)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test the Snapshot/Restore works.
|
||||||
|
func TestManagerRun_snapshotRestore(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
state := local.TestState(t)
|
||||||
|
m, closer := testManager(t)
|
||||||
|
defer closer()
|
||||||
|
m.State = state
|
||||||
|
defer m.Kill()
|
||||||
|
|
||||||
|
// Add the proxy
|
||||||
|
td, closer := testTempDir(t)
|
||||||
|
defer closer()
|
||||||
|
path := filepath.Join(td, "file")
|
||||||
|
testStateProxy(t, state, "web", helperProcess("start-stop", path))
|
||||||
|
|
||||||
|
// Set a low snapshot period so we get a snapshot
|
||||||
|
m.SnapshotPeriod = 10 * time.Millisecond
|
||||||
|
|
||||||
|
// Start the manager
|
||||||
|
go m.Run()
|
||||||
|
|
||||||
|
// We should see the path appear shortly
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.Fatalf("error waiting for path: %s", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Wait for the snapshot
|
||||||
|
snapPath := m.SnapshotPath()
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
raw, err := ioutil.ReadFile(snapPath)
|
||||||
|
if err != nil {
|
||||||
|
r.Fatalf("error waiting for path: %s", err)
|
||||||
|
}
|
||||||
|
if len(raw) < 30 {
|
||||||
|
r.Fatalf("snapshot too small")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Stop the sync
|
||||||
|
require.NoError(m.Close())
|
||||||
|
|
||||||
|
// File should still exist
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Restore a manager from a snapshot
|
||||||
|
m2, closer := testManager(t)
|
||||||
|
m2.State = state
|
||||||
|
defer closer()
|
||||||
|
defer m2.Kill()
|
||||||
|
require.NoError(m2.Restore(snapPath))
|
||||||
|
|
||||||
|
// Start
|
||||||
|
go m2.Run()
|
||||||
|
|
||||||
|
// Add a second proxy so that we can determine when we're up
|
||||||
|
// and running.
|
||||||
|
path2 := filepath.Join(td, "file")
|
||||||
|
testStateProxy(t, state, "db", helperProcess("start-stop", path2))
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, err := os.Stat(path2)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.Fatalf("error waiting for path: %s", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Kill m2, which should kill our main process
|
||||||
|
require.NoError(m2.Kill())
|
||||||
|
|
||||||
|
// File should no longer exist
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.Fatalf("file still exists")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func testManager(t *testing.T) (*Manager, func()) {
|
func testManager(t *testing.T) (*Manager, func()) {
|
||||||
m := NewManager()
|
m := NewManager()
|
||||||
|
|
||||||
|
// Setup a default state
|
||||||
|
m.State = local.TestState(t)
|
||||||
|
|
||||||
// Set these periods low to speed up tests
|
// Set these periods low to speed up tests
|
||||||
m.CoalescePeriod = 1 * time.Millisecond
|
m.CoalescePeriod = 1 * time.Millisecond
|
||||||
m.QuiescentPeriod = 1 * time.Millisecond
|
m.QuiescentPeriod = 1 * time.Millisecond
|
||||||
|
@ -7,6 +7,10 @@
|
|||||||
// for that is available in the "connect/proxy" package.
|
// for that is available in the "connect/proxy" package.
|
||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
// EnvProxyToken is the name of the environment variable that is passed
|
// EnvProxyToken is the name of the environment variable that is passed
|
||||||
// to managed proxies containing the proxy token.
|
// to managed proxies containing the proxy token.
|
||||||
const EnvProxyToken = "CONNECT_PROXY_TOKEN"
|
const EnvProxyToken = "CONNECT_PROXY_TOKEN"
|
||||||
@ -16,6 +20,9 @@ const EnvProxyToken = "CONNECT_PROXY_TOKEN"
|
|||||||
// Calls to all the functions on this interface must be concurrency safe.
|
// Calls to all the functions on this interface must be concurrency safe.
|
||||||
// Please read the documentation carefully on top of each function for expected
|
// Please read the documentation carefully on top of each function for expected
|
||||||
// behavior.
|
// behavior.
|
||||||
|
//
|
||||||
|
// Whenever a new proxy type is implemented, please also update proxyExecMode
|
||||||
|
// and newProxyFromMode and newProxy to support the new proxy.
|
||||||
type Proxy interface {
|
type Proxy interface {
|
||||||
// Start starts the proxy. If an error is returned then the managed
|
// Start starts the proxy. If an error is returned then the managed
|
||||||
// proxy registration is rejected. Therefore, this should only fail if
|
// proxy registration is rejected. Therefore, this should only fail if
|
||||||
@ -56,3 +63,17 @@ type Proxy interface {
|
|||||||
MarshalSnapshot() map[string]interface{}
|
MarshalSnapshot() map[string]interface{}
|
||||||
UnmarshalSnapshot(map[string]interface{}) error
|
UnmarshalSnapshot(map[string]interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// proxyExecMode returns the ProxyExecMode for a Proxy instance.
|
||||||
|
func proxyExecMode(p Proxy) structs.ProxyExecMode {
|
||||||
|
switch p.(type) {
|
||||||
|
case *Daemon:
|
||||||
|
return structs.ProxyExecModeDaemon
|
||||||
|
|
||||||
|
case *Noop:
|
||||||
|
return structs.ProxyExecModeTest
|
||||||
|
|
||||||
|
default:
|
||||||
|
return structs.ProxyExecModeUnspecified
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,7 +1,15 @@
|
|||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/lib/file"
|
||||||
)
|
)
|
||||||
|
|
||||||
// snapshot is the structure of the snapshot file. This is unexported because
|
// snapshot is the structure of the snapshot file. This is unexported because
|
||||||
@ -17,7 +25,7 @@ type snapshot struct {
|
|||||||
Version int
|
Version int
|
||||||
|
|
||||||
// Proxies are the set of proxies that the manager has.
|
// Proxies are the set of proxies that the manager has.
|
||||||
Proxies []snapshotProxy
|
Proxies map[string]snapshotProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
// snapshotProxy represents a single proxy.
|
// snapshotProxy represents a single proxy.
|
||||||
@ -29,3 +37,127 @@ type snapshotProxy struct {
|
|||||||
// implementation uses to restore state.
|
// implementation uses to restore state.
|
||||||
Config map[string]interface{}
|
Config map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// snapshotVersion is the current version to encode within the snapshot.
|
||||||
|
const snapshotVersion = 1
|
||||||
|
|
||||||
|
// SnapshotPath returns the default snapshot path for this manager. This
|
||||||
|
// will return empty if DataDir is not set. This file may not exist yet.
|
||||||
|
func (m *Manager) SnapshotPath() string {
|
||||||
|
if m.DataDir == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return filepath.Join(m.DataDir, "snapshot.json")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot will persist a snapshot of the proxy manager state that
|
||||||
|
// can be restored with Restore.
|
||||||
|
//
|
||||||
|
// If DataDir is non-empty, then the Manager will automatically snapshot
|
||||||
|
// whenever the set of managed proxies changes. This method generally doesn't
|
||||||
|
// need to be called manually.
|
||||||
|
func (m *Manager) Snapshot(path string) error {
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
return m.snapshot(path, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshot is the internal function analogous to Snapshot but expects
|
||||||
|
// a lock to already be held.
|
||||||
|
//
|
||||||
|
// checkDup when set will store the snapshot on lastSnapshot and use
|
||||||
|
// reflect.DeepEqual to verify that its not writing an identical snapshot.
|
||||||
|
func (m *Manager) snapshot(path string, checkDup bool) error {
|
||||||
|
// Build the snapshot
|
||||||
|
s := snapshot{
|
||||||
|
Version: snapshotVersion,
|
||||||
|
Proxies: make(map[string]snapshotProxy, len(m.proxies)),
|
||||||
|
}
|
||||||
|
for id, p := range m.proxies {
|
||||||
|
// Get the snapshot configuration. If the configuration is nil or
|
||||||
|
// empty then we don't persist this proxy.
|
||||||
|
config := p.MarshalSnapshot()
|
||||||
|
if len(config) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Proxies[id] = snapshotProxy{
|
||||||
|
Mode: proxyExecMode(p),
|
||||||
|
Config: config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dup detection, if the snapshot is identical to the last, do nothing
|
||||||
|
if checkDup && reflect.DeepEqual(m.lastSnapshot, &s) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode as JSON
|
||||||
|
encoded, err := json.Marshal(&s)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the file
|
||||||
|
err = file.WriteAtomic(path, encoded)
|
||||||
|
if err == nil && checkDup {
|
||||||
|
m.lastSnapshot = &s
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore restores the manager state from a snapshot at path. If path
|
||||||
|
// doesn't exist, this does nothing and no error is returned.
|
||||||
|
//
|
||||||
|
// This restores proxy state but does not restore any Manager configuration
|
||||||
|
// such as DataDir, Logger, etc. All of those should be set _before_ Restore
|
||||||
|
// is called.
|
||||||
|
//
|
||||||
|
// Restore must be called before Run. Restore will immediately start
|
||||||
|
// supervising the restored processes but will not sync with the local
|
||||||
|
// state store until Run is called.
|
||||||
|
//
|
||||||
|
// If an error is returned the manager state is left untouched.
|
||||||
|
func (m *Manager) Restore(path string) error {
|
||||||
|
buf, err := ioutil.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var s snapshot
|
||||||
|
if err := json.Unmarshal(buf, &s); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the version matches so we can be more confident that we're
|
||||||
|
// decoding a structure that we expect.
|
||||||
|
if s.Version != snapshotVersion {
|
||||||
|
return fmt.Errorf("unknown snapshot version, expecting %d", snapshotVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the proxies from the snapshot
|
||||||
|
proxies := make(map[string]Proxy, len(s.Proxies))
|
||||||
|
for id, sp := range s.Proxies {
|
||||||
|
p, err := m.newProxyFromMode(sp.Mode, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.UnmarshalSnapshot(sp.Config); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
proxies[id] = p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Overwrite the proxies. The documentation notes that this will happen.
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
m.proxies = proxies
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user