mirror of https://github.com/status-im/consul.git
agent/proxy: implement snapshotting for daemons
This commit is contained in:
parent
b7580f4fad
commit
0e8c0b7b48
|
@ -11,6 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/lib/file"
|
"github.com/hashicorp/consul/lib/file"
|
||||||
|
"github.com/mitchellh/mapstructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Constants related to restart timers with the daemon mode proxies. At some
|
// Constants related to restart timers with the daemon mode proxies. At some
|
||||||
|
@ -261,6 +262,26 @@ func (p *Daemon) Stop() error {
|
||||||
return process.Kill()
|
return process.Kill()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stopKeepAlive is like Stop but keeps the process running. This is
|
||||||
|
// used only for tests.
|
||||||
|
func (p *Daemon) stopKeepAlive() error {
|
||||||
|
p.lock.Lock()
|
||||||
|
|
||||||
|
// If we're already stopped or never started, then no problem.
|
||||||
|
if p.stopped || p.process == nil {
|
||||||
|
p.stopped = true
|
||||||
|
p.lock.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that we've stopped
|
||||||
|
p.stopped = true
|
||||||
|
close(p.stopCh)
|
||||||
|
p.lock.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Equal implements Proxy to check for equality.
|
// Equal implements Proxy to check for equality.
|
||||||
func (p *Daemon) Equal(raw Proxy) bool {
|
func (p *Daemon) Equal(raw Proxy) bool {
|
||||||
p2, ok := raw.(*Daemon)
|
p2, ok := raw.(*Daemon)
|
||||||
|
@ -275,3 +296,81 @@ func (p *Daemon) Equal(raw Proxy) bool {
|
||||||
reflect.DeepEqual(p.Command.Args, p2.Command.Args) &&
|
reflect.DeepEqual(p.Command.Args, p2.Command.Args) &&
|
||||||
reflect.DeepEqual(p.Command.Env, p2.Command.Env)
|
reflect.DeepEqual(p.Command.Env, p2.Command.Env)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarshalSnapshot implements Proxy
|
||||||
|
func (p *Daemon) MarshalSnapshot() map[string]interface{} {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
// If we're stopped or have no process, then nothing to snapshot.
|
||||||
|
if p.stopped || p.process == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return map[string]interface{}{
|
||||||
|
"Pid": p.process.Pid,
|
||||||
|
"CommandPath": p.Command.Path,
|
||||||
|
"CommandArgs": p.Command.Args,
|
||||||
|
"CommandDir": p.Command.Dir,
|
||||||
|
"CommandEnv": p.Command.Env,
|
||||||
|
"ProxyToken": p.ProxyToken,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalSnapshot implements Proxy
|
||||||
|
func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
|
||||||
|
var s daemonSnapshot
|
||||||
|
if err := mapstructure.Decode(m, &s); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
// Set the basic fields
|
||||||
|
p.ProxyToken = s.ProxyToken
|
||||||
|
p.Command = &exec.Cmd{
|
||||||
|
Path: s.CommandPath,
|
||||||
|
Args: s.CommandArgs,
|
||||||
|
Dir: s.CommandDir,
|
||||||
|
Env: s.CommandEnv,
|
||||||
|
}
|
||||||
|
|
||||||
|
// For the pid, we want to find the process.
|
||||||
|
proc, err := os.FindProcess(s.Pid)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(mitchellh): we should check if proc refers to a process that
|
||||||
|
// is currently alive. If not, we should return here and not manage the
|
||||||
|
// process.
|
||||||
|
|
||||||
|
// "Start it"
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
exitedCh := make(chan struct{})
|
||||||
|
p.stopCh = stopCh
|
||||||
|
p.exitedCh = exitedCh
|
||||||
|
p.process = proc
|
||||||
|
go p.keepAlive(stopCh, exitedCh)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// daemonSnapshot is the structure of the marshalled data for snapshotting.
|
||||||
|
type daemonSnapshot struct {
|
||||||
|
// Pid of the process. This is the only value actually required to
|
||||||
|
// regain mangement control. The remainder values are for Equal.
|
||||||
|
Pid int
|
||||||
|
|
||||||
|
// Command information
|
||||||
|
CommandPath string
|
||||||
|
CommandArgs []string
|
||||||
|
CommandDir string
|
||||||
|
CommandEnv []string
|
||||||
|
|
||||||
|
// NOTE(mitchellh): longer term there are discussions/plans to only
|
||||||
|
// store the hash of the token but for now we need the full token in
|
||||||
|
// case the process dies and has to be restarted.
|
||||||
|
ProxyToken string
|
||||||
|
}
|
||||||
|
|
|
@ -316,3 +316,95 @@ func TestDaemonEqual(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDaemonMarshalSnapshot(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
Name string
|
||||||
|
Proxy Proxy
|
||||||
|
Expected map[string]interface{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"stopped daemon",
|
||||||
|
&Daemon{
|
||||||
|
Command: &exec.Cmd{Path: "/foo"},
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
"basic",
|
||||||
|
&Daemon{
|
||||||
|
Command: &exec.Cmd{Path: "/foo"},
|
||||||
|
process: &os.Process{Pid: 42},
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"Pid": 42,
|
||||||
|
"CommandPath": "/foo",
|
||||||
|
"CommandArgs": []string(nil),
|
||||||
|
"CommandDir": "",
|
||||||
|
"CommandEnv": []string(nil),
|
||||||
|
"ProxyToken": "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
|
actual := tc.Proxy.MarshalSnapshot()
|
||||||
|
require.Equal(t, tc.Expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDaemonUnmarshalSnapshot(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
td, closer := testTempDir(t)
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
path := filepath.Join(td, "file")
|
||||||
|
uuid, err := uuid.GenerateUUID()
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
d := &Daemon{
|
||||||
|
Command: helperProcess("start-stop", path),
|
||||||
|
ProxyToken: uuid,
|
||||||
|
Logger: testLogger,
|
||||||
|
}
|
||||||
|
require.NoError(d.Start())
|
||||||
|
|
||||||
|
// Wait for the file to exist
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Fatalf("error: %s", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Snapshot
|
||||||
|
snap := d.MarshalSnapshot()
|
||||||
|
|
||||||
|
// Stop the original daemon but keep it alive
|
||||||
|
require.NoError(d.stopKeepAlive())
|
||||||
|
|
||||||
|
// Restore the second daemon
|
||||||
|
d2 := &Daemon{Logger: testLogger}
|
||||||
|
require.NoError(d2.UnmarshalSnapshot(snap))
|
||||||
|
|
||||||
|
// Stop the process
|
||||||
|
require.NoError(d2.Stop())
|
||||||
|
|
||||||
|
// File should no longer exist.
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// err might be nil here but that's okay
|
||||||
|
r.Fatalf("should not exist: %s", err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ type Manager struct {
|
||||||
//
|
//
|
||||||
// * logs/ - log files named <service id>-std{out|err}.log
|
// * logs/ - log files named <service id>-std{out|err}.log
|
||||||
// * pids/ - pid files for daemons named <service id>.pid
|
// * pids/ - pid files for daemons named <service id>.pid
|
||||||
// * state.ext - the state of the manager
|
// * snapshot.json - the state of the manager
|
||||||
//
|
//
|
||||||
DataDir string
|
DataDir string
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,8 @@ package proxy
|
||||||
// Noop implements Proxy and does nothing.
|
// Noop implements Proxy and does nothing.
|
||||||
type Noop struct{}
|
type Noop struct{}
|
||||||
|
|
||||||
func (p *Noop) Start() error { return nil }
|
func (p *Noop) Start() error { return nil }
|
||||||
func (p *Noop) Stop() error { return nil }
|
func (p *Noop) Stop() error { return nil }
|
||||||
func (p *Noop) Equal(Proxy) bool { return true }
|
func (p *Noop) Equal(Proxy) bool { return true }
|
||||||
|
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
|
||||||
|
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }
|
||||||
|
|
|
@ -39,4 +39,20 @@ type Proxy interface {
|
||||||
// If Equal returns true, the old proxy will remain running and the new
|
// If Equal returns true, the old proxy will remain running and the new
|
||||||
// one will be ignored.
|
// one will be ignored.
|
||||||
Equal(Proxy) bool
|
Equal(Proxy) bool
|
||||||
|
|
||||||
|
// MarshalSnapshot returns the state that will be stored in a snapshot
|
||||||
|
// so that Consul can recover the proxy process after a restart. The
|
||||||
|
// result should only contain primitive values and containers (lists/maps).
|
||||||
|
//
|
||||||
|
// UnmarshalSnapshot is called to restore the receiving Proxy from its
|
||||||
|
// marshalled state. If UnmarshalSnapshot returns an error, the snapshot
|
||||||
|
// is ignored and the marshalled snapshot will be lost. The manager will
|
||||||
|
// log.
|
||||||
|
//
|
||||||
|
// This should save/restore enough state to be able to regain management
|
||||||
|
// of a proxy process as well as to perform the Equal method above. The
|
||||||
|
// Equal method will be called when a local state sync happens to determine
|
||||||
|
// if the recovered process should be restarted or not.
|
||||||
|
MarshalSnapshot() map[string]interface{}
|
||||||
|
UnmarshalSnapshot(map[string]interface{}) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// snapshot is the structure of the snapshot file. This is unexported because
|
||||||
|
// we don't want this being a public API.
|
||||||
|
//
|
||||||
|
// The snapshot doesn't contain any configuration for the manager. We only
|
||||||
|
// want to restore the proxies that we're managing, and we use the config
|
||||||
|
// set at runtime to sync and reconcile what proxies we should start,
|
||||||
|
// restart, stop, or have already running.
|
||||||
|
type snapshot struct {
|
||||||
|
// Version is the version of the snapshot format and can be used
|
||||||
|
// to safely update the format in the future if necessary.
|
||||||
|
Version int
|
||||||
|
|
||||||
|
// Proxies are the set of proxies that the manager has.
|
||||||
|
Proxies []snapshotProxy
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshotProxy represents a single proxy.
|
||||||
|
type snapshotProxy struct {
|
||||||
|
// Mode corresponds to the type of proxy running.
|
||||||
|
Mode structs.ProxyExecMode
|
||||||
|
|
||||||
|
// Config is an opaque mapping of primitive values that the proxy
|
||||||
|
// implementation uses to restore state.
|
||||||
|
Config map[string]interface{}
|
||||||
|
}
|
Loading…
Reference in New Issue