From 0e8c0b7b4803664ce2bd6b510e13ba0532bb3337 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Thu, 3 May 2018 15:46:00 -0700 Subject: [PATCH] agent/proxy: implement snapshotting for daemons --- agent/proxy/daemon.go | 99 ++++++++++++++++++++++++++++++++++++++ agent/proxy/daemon_test.go | 92 +++++++++++++++++++++++++++++++++++ agent/proxy/manager.go | 2 +- agent/proxy/noop.go | 8 +-- agent/proxy/proxy.go | 16 ++++++ agent/proxy/snapshot.go | 31 ++++++++++++ 6 files changed, 244 insertions(+), 4 deletions(-) create mode 100644 agent/proxy/snapshot.go diff --git a/agent/proxy/daemon.go b/agent/proxy/daemon.go index e3b376c057..e1ec2e1b08 100644 --- a/agent/proxy/daemon.go +++ b/agent/proxy/daemon.go @@ -11,6 +11,7 @@ import ( "time" "github.com/hashicorp/consul/lib/file" + "github.com/mitchellh/mapstructure" ) // Constants related to restart timers with the daemon mode proxies. At some @@ -261,6 +262,26 @@ func (p *Daemon) Stop() error { return process.Kill() } +// stopKeepAlive is like Stop but keeps the process running. This is +// used only for tests. +func (p *Daemon) stopKeepAlive() error { + p.lock.Lock() + + // If we're already stopped or never started, then no problem. + if p.stopped || p.process == nil { + p.stopped = true + p.lock.Unlock() + return nil + } + + // Note that we've stopped + p.stopped = true + close(p.stopCh) + p.lock.Unlock() + + return nil +} + // Equal implements Proxy to check for equality. func (p *Daemon) Equal(raw Proxy) bool { p2, ok := raw.(*Daemon) @@ -275,3 +296,81 @@ func (p *Daemon) Equal(raw Proxy) bool { reflect.DeepEqual(p.Command.Args, p2.Command.Args) && reflect.DeepEqual(p.Command.Env, p2.Command.Env) } + +// MarshalSnapshot implements Proxy +func (p *Daemon) MarshalSnapshot() map[string]interface{} { + p.lock.Lock() + defer p.lock.Unlock() + + // If we're stopped or have no process, then nothing to snapshot. + if p.stopped || p.process == nil { + return nil + } + + return map[string]interface{}{ + "Pid": p.process.Pid, + "CommandPath": p.Command.Path, + "CommandArgs": p.Command.Args, + "CommandDir": p.Command.Dir, + "CommandEnv": p.Command.Env, + "ProxyToken": p.ProxyToken, + } +} + +// UnmarshalSnapshot implements Proxy +func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error { + var s daemonSnapshot + if err := mapstructure.Decode(m, &s); err != nil { + return err + } + + p.lock.Lock() + defer p.lock.Unlock() + + // Set the basic fields + p.ProxyToken = s.ProxyToken + p.Command = &exec.Cmd{ + Path: s.CommandPath, + Args: s.CommandArgs, + Dir: s.CommandDir, + Env: s.CommandEnv, + } + + // For the pid, we want to find the process. + proc, err := os.FindProcess(s.Pid) + if err != nil { + return err + } + + // TODO(mitchellh): we should check if proc refers to a process that + // is currently alive. If not, we should return here and not manage the + // process. + + // "Start it" + stopCh := make(chan struct{}) + exitedCh := make(chan struct{}) + p.stopCh = stopCh + p.exitedCh = exitedCh + p.process = proc + go p.keepAlive(stopCh, exitedCh) + + return nil +} + +// daemonSnapshot is the structure of the marshalled data for snapshotting. +type daemonSnapshot struct { + // Pid of the process. This is the only value actually required to + // regain mangement control. The remainder values are for Equal. + Pid int + + // Command information + CommandPath string + CommandArgs []string + CommandDir string + CommandEnv []string + + // NOTE(mitchellh): longer term there are discussions/plans to only + // store the hash of the token but for now we need the full token in + // case the process dies and has to be restarted. + ProxyToken string +} diff --git a/agent/proxy/daemon_test.go b/agent/proxy/daemon_test.go index 652364c5ea..6e74cdf885 100644 --- a/agent/proxy/daemon_test.go +++ b/agent/proxy/daemon_test.go @@ -316,3 +316,95 @@ func TestDaemonEqual(t *testing.T) { }) } } + +func TestDaemonMarshalSnapshot(t *testing.T) { + cases := []struct { + Name string + Proxy Proxy + Expected map[string]interface{} + }{ + { + "stopped daemon", + &Daemon{ + Command: &exec.Cmd{Path: "/foo"}, + }, + nil, + }, + + { + "basic", + &Daemon{ + Command: &exec.Cmd{Path: "/foo"}, + process: &os.Process{Pid: 42}, + }, + map[string]interface{}{ + "Pid": 42, + "CommandPath": "/foo", + "CommandArgs": []string(nil), + "CommandDir": "", + "CommandEnv": []string(nil), + "ProxyToken": "", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + actual := tc.Proxy.MarshalSnapshot() + require.Equal(t, tc.Expected, actual) + }) + } +} + +func TestDaemonUnmarshalSnapshot(t *testing.T) { + t.Parallel() + + require := require.New(t) + td, closer := testTempDir(t) + defer closer() + + path := filepath.Join(td, "file") + uuid, err := uuid.GenerateUUID() + require.NoError(err) + + d := &Daemon{ + Command: helperProcess("start-stop", path), + ProxyToken: uuid, + Logger: testLogger, + } + require.NoError(d.Start()) + + // Wait for the file to exist + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // Snapshot + snap := d.MarshalSnapshot() + + // Stop the original daemon but keep it alive + require.NoError(d.stopKeepAlive()) + + // Restore the second daemon + d2 := &Daemon{Logger: testLogger} + require.NoError(d2.UnmarshalSnapshot(snap)) + + // Stop the process + require.NoError(d2.Stop()) + + // File should no longer exist. + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if os.IsNotExist(err) { + return + } + + // err might be nil here but that's okay + r.Fatalf("should not exist: %s", err) + }) +} diff --git a/agent/proxy/manager.go b/agent/proxy/manager.go index f5c2d996ef..c9c63311f1 100644 --- a/agent/proxy/manager.go +++ b/agent/proxy/manager.go @@ -60,7 +60,7 @@ type Manager struct { // // * logs/ - log files named -std{out|err}.log // * pids/ - pid files for daemons named .pid - // * state.ext - the state of the manager + // * snapshot.json - the state of the manager // DataDir string diff --git a/agent/proxy/noop.go b/agent/proxy/noop.go index 9ce0135546..a96425d848 100644 --- a/agent/proxy/noop.go +++ b/agent/proxy/noop.go @@ -3,6 +3,8 @@ package proxy // Noop implements Proxy and does nothing. type Noop struct{} -func (p *Noop) Start() error { return nil } -func (p *Noop) Stop() error { return nil } -func (p *Noop) Equal(Proxy) bool { return true } +func (p *Noop) Start() error { return nil } +func (p *Noop) Stop() error { return nil } +func (p *Noop) Equal(Proxy) bool { return true } +func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil } +func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil } diff --git a/agent/proxy/proxy.go b/agent/proxy/proxy.go index 549a6ee26d..e1bad92c0e 100644 --- a/agent/proxy/proxy.go +++ b/agent/proxy/proxy.go @@ -39,4 +39,20 @@ type Proxy interface { // If Equal returns true, the old proxy will remain running and the new // one will be ignored. Equal(Proxy) bool + + // MarshalSnapshot returns the state that will be stored in a snapshot + // so that Consul can recover the proxy process after a restart. The + // result should only contain primitive values and containers (lists/maps). + // + // UnmarshalSnapshot is called to restore the receiving Proxy from its + // marshalled state. If UnmarshalSnapshot returns an error, the snapshot + // is ignored and the marshalled snapshot will be lost. The manager will + // log. + // + // This should save/restore enough state to be able to regain management + // of a proxy process as well as to perform the Equal method above. The + // Equal method will be called when a local state sync happens to determine + // if the recovered process should be restarted or not. + MarshalSnapshot() map[string]interface{} + UnmarshalSnapshot(map[string]interface{}) error } diff --git a/agent/proxy/snapshot.go b/agent/proxy/snapshot.go new file mode 100644 index 0000000000..b119bfddf6 --- /dev/null +++ b/agent/proxy/snapshot.go @@ -0,0 +1,31 @@ +package proxy + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +// snapshot is the structure of the snapshot file. This is unexported because +// we don't want this being a public API. +// +// The snapshot doesn't contain any configuration for the manager. We only +// want to restore the proxies that we're managing, and we use the config +// set at runtime to sync and reconcile what proxies we should start, +// restart, stop, or have already running. +type snapshot struct { + // Version is the version of the snapshot format and can be used + // to safely update the format in the future if necessary. + Version int + + // Proxies are the set of proxies that the manager has. + Proxies []snapshotProxy +} + +// snapshotProxy represents a single proxy. +type snapshotProxy struct { + // Mode corresponds to the type of proxy running. + Mode structs.ProxyExecMode + + // Config is an opaque mapping of primitive values that the proxy + // implementation uses to restore state. + Config map[string]interface{} +}