Persist proxy state through agent restart

This commit is contained in:
Paul Banks 2018-05-14 13:55:24 -07:00 committed by Jack Pearkes
parent eb3fcb39b3
commit e21723a891
5 changed files with 339 additions and 23 deletions

View File

@ -50,6 +50,9 @@ const (
// Path to save agent service definitions
servicesDir = "services"
// Path to save agent proxy definitions
proxyDir = "proxies"
// Path to save local agent checks
checksDir = "checks"
checkStateDir = "checks/state"
@ -1606,6 +1609,39 @@ func (a *Agent) purgeService(serviceID string) error {
return nil
}
// persistedProxy is used to wrap a proxy definition and bundle it with an Proxy
// token so we can continue to authenticate the running proxy after a restart.
type persistedProxy struct {
ProxyToken string
Proxy *structs.ConnectManagedProxy
}
// persistProxy saves a proxy definition to a JSON file in the data dir
func (a *Agent) persistProxy(proxy *local.ManagedProxy) error {
proxyPath := filepath.Join(a.config.DataDir, proxyDir,
stringHash(proxy.Proxy.ProxyService.ID))
wrapped := persistedProxy{
ProxyToken: proxy.ProxyToken,
Proxy: proxy.Proxy,
}
encoded, err := json.Marshal(wrapped)
if err != nil {
return err
}
return file.WriteAtomic(proxyPath, encoded)
}
// purgeProxy removes a persisted proxy definition file from the data dir
func (a *Agent) purgeProxy(proxyID string) error {
proxyPath := filepath.Join(a.config.DataDir, proxyDir, stringHash(proxyID))
if _, err := os.Stat(proxyPath); err == nil {
return os.Remove(proxyPath)
}
return nil
}
// persistCheck saves a check definition to the local agent's state directory
func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckType) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, checkIDHash(check.CheckID))
@ -2048,7 +2084,13 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
// since this is only ever called when setting up a _managed_ proxy which was
// registered as part of a service registration either from config or HTTP API
// call.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error {
//
// The restoredProxyToken argument should only be used when restoring proxy
// definitions from disk; new proxies must leave it blank to get a new token
// assigned. We need to restore from disk to enable to continue authenticating
// running proxies that already had that credential injected.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
restoredProxyToken string) error {
// Lookup the target service token in state if there is one.
token := a.State.ServiceToken(proxy.TargetServiceID)
@ -2064,7 +2106,7 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error
// NodeService for the proxy populated with the allocated (or configured) port
// and an ID, but it doesn't add it to the agent directly since that could
// deadlock and we may need to coordinate adding it and persisting etc.
proxyState, err := a.State.AddProxy(proxy, token)
proxyState, err := a.State.AddProxy(proxy, token, restoredProxyToken)
if err != nil {
return err
}
@ -2078,7 +2120,10 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error
return err
}
// TODO(banks): persist some of the local proxy state (not the _proxy_ token).
// Persist the proxy
if persist && !a.config.DevMode {
return a.persistProxy(proxyState)
}
return nil
}
@ -2135,7 +2180,9 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
return err
}
// TODO(banks): unpersist proxy
if persist && !a.config.DevMode {
return a.purgeProxy(proxyID)
}
return nil
}
@ -2615,13 +2662,71 @@ func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
if proxy == nil {
continue
}
if err := a.AddProxy(proxy, false); err != nil {
if err := a.AddProxy(proxy, false, ""); err != nil {
return fmt.Errorf("failed adding proxy: %s", err)
}
}
}
// TODO(banks): persist proxy state and re-load it here?
// Load any persisted proxies
proxyDir := filepath.Join(a.config.DataDir, proxyDir)
files, err := ioutil.ReadDir(proxyDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err)
}
for _, fi := range files {
// Skip all dirs
if fi.IsDir() {
continue
}
// Skip all partially written temporary files
if strings.HasSuffix(fi.Name(), "tmp") {
a.logger.Printf("[WARN] agent: Ignoring temporary proxy file %v", fi.Name())
continue
}
// Open the file for reading
file := filepath.Join(proxyDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed opening proxy file %q: %s", file, err)
}
// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
if err != nil {
return fmt.Errorf("failed reading proxy file %q: %s", file, err)
}
// Try decoding the proxy definition
var p persistedProxy
if err := json.Unmarshal(buf, &p); err != nil {
a.logger.Printf("[ERR] agent: Failed decoding proxy file %q: %s", file, err)
continue
}
proxyID := p.Proxy.ProxyService.ID
if a.State.Proxy(proxyID) != nil {
// Purge previously persisted proxy. This allows config to be preferred
// over services persisted from the API.
a.logger.Printf("[DEBUG] agent: proxy %q exists, not restoring from %q",
proxyID, file)
if err := a.purgeProxy(proxyID); err != nil {
return fmt.Errorf("failed purging proxy %q: %s", proxyID, err)
}
} else {
a.logger.Printf("[DEBUG] agent: restored proxy definition %q from %q",
proxyID, file)
if err := a.AddProxy(p.Proxy, false, p.ProxyToken); err != nil {
return fmt.Errorf("failed adding proxy %q: %s", proxyID, err)
}
}
}
return nil
}

View File

@ -629,7 +629,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
}
// Add proxy (which will add proxy service so do it before we trigger sync)
if proxy != nil {
if err := s.agent.AddProxy(proxy, true); err != nil {
if err := s.agent.AddProxy(proxy, true, ""); err != nil {
return nil, err
}
}

View File

@ -73,7 +73,7 @@ func TestAgent_Services(t *testing.T) {
},
TargetServiceID: "mysql",
}
_, err := a.State.AddProxy(prxy1, "")
_, err := a.State.AddProxy(prxy1, "", "")
require.NoError(t, err)
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)

View File

@ -26,6 +26,7 @@ import (
"github.com/hashicorp/consul/types"
uuid "github.com/hashicorp/go-uuid"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -1350,6 +1351,187 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
}
}
func TestAgent_PersistProxy(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
cfg := `
server = false
bootstrap = false
data_dir = "` + dataDir + `"
`
a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a.Start()
defer os.RemoveAll(dataDir)
defer a.Shutdown()
require := require.New(t)
assert := assert.New(t)
// Add a service to proxy (precondition for AddProxy)
svc1 := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
require.NoError(a.AddService(svc1, nil, true, ""))
// Add a proxy for it
proxy := &structs.ConnectManagedProxy{
TargetServiceID: svc1.ID,
Command: []string{"/bin/sleep", "3600"},
}
file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy"))
// Proxy is not persisted unless requested
require.NoError(a.AddProxy(proxy, false, ""))
_, err := os.Stat(file)
require.Error(err, "proxy should not be persisted")
// Proxy is persisted if requested
require.NoError(a.AddProxy(proxy, true, ""))
_, err = os.Stat(file)
require.NoError(err, "proxy should be persisted")
content, err := ioutil.ReadFile(file)
require.NoError(err)
var gotProxy persistedProxy
require.NoError(json.Unmarshal(content, &gotProxy))
assert.Equal(proxy.Command, gotProxy.Proxy.Command)
assert.Len(gotProxy.ProxyToken, 36) // sanity check for UUID
// Updates service definition on disk
proxy.Config = map[string]interface{}{
"foo": "bar",
}
require.NoError(a.AddProxy(proxy, true, ""))
content, err = ioutil.ReadFile(file)
require.NoError(err)
require.NoError(json.Unmarshal(content, &gotProxy))
assert.Equal(gotProxy.Proxy.Command, proxy.Command)
assert.Equal(gotProxy.Proxy.Config, proxy.Config)
assert.Len(gotProxy.ProxyToken, 36) // sanity check for UUID
a.Shutdown()
// Should load it back during later start
a2 := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a2.Start()
defer a2.Shutdown()
restored := a2.State.Proxy("redis-proxy")
require.NotNil(restored)
assert.Equal(gotProxy.ProxyToken, restored.ProxyToken)
// Ensure the port that was auto picked at random is the same again
assert.Equal(gotProxy.Proxy.ProxyService.Port, restored.Proxy.ProxyService.Port)
assert.Equal(gotProxy.Proxy.Command, restored.Proxy.Command)
}
func TestAgent_PurgeProxy(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
require := require.New(t)
// Add a service to proxy (precondition for AddProxy)
svc1 := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
require.NoError(a.AddService(svc1, nil, true, ""))
// Add a proxy for it
proxy := &structs.ConnectManagedProxy{
TargetServiceID: svc1.ID,
Command: []string{"/bin/sleep", "3600"},
}
proxyID := "redis-proxy"
require.NoError(a.AddProxy(proxy, true, ""))
file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy"))
// Not removed
require.NoError(a.RemoveProxy(proxyID, false))
_, err := os.Stat(file)
require.NoError(err, "should not be removed")
// Re-add the proxy
require.NoError(a.AddProxy(proxy, true, ""))
// Removed
require.NoError(a.RemoveProxy(proxyID, true))
_, err = os.Stat(file)
require.Error(err, "should be removed")
}
func TestAgent_PurgeProxyOnDuplicate(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
cfg := `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
`
a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a.Start()
defer a.Shutdown()
defer os.RemoveAll(dataDir)
require := require.New(t)
// Add a service to proxy (precondition for AddProxy)
svc1 := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
require.NoError(a.AddService(svc1, nil, true, ""))
// Add a proxy for it
proxy := &structs.ConnectManagedProxy{
TargetServiceID: svc1.ID,
Command: []string{"/bin/sleep", "3600"},
}
proxyID := "redis-proxy"
require.NoError(a.AddProxy(proxy, true, ""))
a.Shutdown()
// Try bringing the agent back up with the service already
// existing in the config
a2 := &TestAgent{Name: t.Name() + "-a2", HCL: cfg + `
service = {
id = "redis"
name = "redis"
tags = ["bar"]
port = 9000
connect {
proxy {
command = ["/bin/sleep", "3600"]
}
}
}
`, DataDir: dataDir}
a2.Start()
defer a2.Shutdown()
file := filepath.Join(a.Config.DataDir, proxyDir, stringHash(proxyID))
_, err := os.Stat(file)
require.Error(err, "should have removed remote state")
result := a2.State.Proxy(proxyID)
require.NotNil(result)
require.Equal(proxy.Command, result.Proxy.Command)
}
func TestAgent_PersistCheck(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
@ -2482,7 +2664,7 @@ func TestAgent_AddProxy(t *testing.T) {
t.Run(tt.desc, func(t *testing.T) {
require := require.New(t)
err := a.AddProxy(tt.proxy, false)
err := a.AddProxy(tt.proxy, false, "")
if tt.wantErr {
require.Error(err)
return
@ -2522,7 +2704,7 @@ func TestAgent_RemoveProxy(t *testing.T) {
ExecMode: structs.ProxyExecModeDaemon,
Command: []string{"foo"},
}
require.NoError(a.AddProxy(pReg, false))
require.NoError(a.AddProxy(pReg, false, ""))
// Test the ID was created as we expect.
gotProxy := a.State.Proxy("web-proxy")

View File

@ -118,12 +118,19 @@ type ManagedProxy struct {
// ProxyToken is a special local-only security token that grants the bearer
// access to the proxy's config as well as allowing it to request certificates
// on behalf of the TargetService. Certain connect endpoints will validate
// against this token and if it matches will then use the TargetService.Token
// to actually authenticate the upstream RPC on behalf of the service. This
// token is passed securely to the proxy process via ENV vars and should never
// be exposed any other way. Unmanaged proxies will never see this and need to
// use service-scoped ACL tokens distributed externally.
// on behalf of the target service. Certain connect endpoints will validate
// against this token and if it matches will then use the target service's
// registration token to actually authenticate the upstream RPC on behalf of
// the service. This token is passed securely to the proxy process via ENV
// vars and should never be exposed any other way. Unmanaged proxies will
// never see this and need to use service-scoped ACL tokens distributed
// externally. It is persisted in the local state to allow authenticating
// running proxies after the agent restarts.
//
// TODO(banks): In theory we only need to persist this at all to _validate_
// which means we could keep only a hash in memory and on disk and only pass
// the actual token to the process on startup. That would require a bit of
// refactoring though to have the required interaction with the proxy manager.
ProxyToken string
// WatchCh is a close-only chan that is closed when the proxy is removed or
@ -574,7 +581,13 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
// (since that has to do other book keeping). The token passed here is the ACL
// token the service used to register itself so must have write on service
// record. AddProxy returns the newly added proxy and an error.
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, error) {
//
// The restoredProxyToken argument should only be used when restoring proxy
// definitions from disk; new proxies must leave it blank to get a new token
// assigned. We need to restore from disk to enable to continue authenticating
// running proxies that already had that credential injected.
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token,
restoredProxyToken string) (*ManagedProxy, error) {
if proxy == nil {
return nil, fmt.Errorf("no proxy")
}
@ -603,19 +616,35 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
Port: cfg.BindPort,
}
pToken, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
// Lock now. We can't lock earlier as l.Service would deadlock and shouldn't
// anyway to minimise the critical section.
l.Lock()
defer l.Unlock()
pToken := restoredProxyToken
// Does this proxy instance allready exist?
if existing, ok := l.managedProxies[svc.ID]; ok {
svc.Port = existing.Proxy.ProxyService.Port
// Keep the existing proxy token so we don't have to restart proxy to
// re-inject token.
pToken = existing.ProxyToken
// If the user didn't explicitly change the port, use the old one instead of
// assigning new.
if svc.Port < 1 {
svc.Port = existing.Proxy.ProxyService.Port
}
} else if proxyService, ok := l.services[svc.ID]; ok {
// The proxy-service already exists so keep the port that got assigned. This
// happens on reload from disk since service definitions are reloaded first.
svc.Port = proxyService.Service.Port
}
// If this is a new instance, generate a token
if pToken == "" {
pToken, err = uuid.GenerateUUID()
if err != nil {
return nil, err
}
}
// Allocate port if needed (min and max inclusive).