diff --git a/agent/agent.go b/agent/agent.go index 6f909e5c71..ca62a03cde 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -50,6 +50,9 @@ const ( // Path to save agent service definitions servicesDir = "services" + // Path to save agent proxy definitions + proxyDir = "proxies" + // Path to save local agent checks checksDir = "checks" checkStateDir = "checks/state" @@ -1606,6 +1609,39 @@ func (a *Agent) purgeService(serviceID string) error { return nil } +// persistedProxy is used to wrap a proxy definition and bundle it with an Proxy +// token so we can continue to authenticate the running proxy after a restart. +type persistedProxy struct { + ProxyToken string + Proxy *structs.ConnectManagedProxy +} + +// persistProxy saves a proxy definition to a JSON file in the data dir +func (a *Agent) persistProxy(proxy *local.ManagedProxy) error { + proxyPath := filepath.Join(a.config.DataDir, proxyDir, + stringHash(proxy.Proxy.ProxyService.ID)) + + wrapped := persistedProxy{ + ProxyToken: proxy.ProxyToken, + Proxy: proxy.Proxy, + } + encoded, err := json.Marshal(wrapped) + if err != nil { + return err + } + + return file.WriteAtomic(proxyPath, encoded) +} + +// purgeProxy removes a persisted proxy definition file from the data dir +func (a *Agent) purgeProxy(proxyID string) error { + proxyPath := filepath.Join(a.config.DataDir, proxyDir, stringHash(proxyID)) + if _, err := os.Stat(proxyPath); err == nil { + return os.Remove(proxyPath) + } + return nil +} + // persistCheck saves a check definition to the local agent's state directory func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckType) error { checkPath := filepath.Join(a.config.DataDir, checksDir, checkIDHash(check.CheckID)) @@ -2048,7 +2084,13 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { // since this is only ever called when setting up a _managed_ proxy which was // registered as part of a service registration either from config or HTTP API // call. -func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error { +// +// The restoredProxyToken argument should only be used when restoring proxy +// definitions from disk; new proxies must leave it blank to get a new token +// assigned. We need to restore from disk to enable to continue authenticating +// running proxies that already had that credential injected. +func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool, + restoredProxyToken string) error { // Lookup the target service token in state if there is one. token := a.State.ServiceToken(proxy.TargetServiceID) @@ -2064,7 +2106,7 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error // NodeService for the proxy populated with the allocated (or configured) port // and an ID, but it doesn't add it to the agent directly since that could // deadlock and we may need to coordinate adding it and persisting etc. - proxyState, err := a.State.AddProxy(proxy, token) + proxyState, err := a.State.AddProxy(proxy, token, restoredProxyToken) if err != nil { return err } @@ -2078,7 +2120,10 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error return err } - // TODO(banks): persist some of the local proxy state (not the _proxy_ token). + // Persist the proxy + if persist && !a.config.DevMode { + return a.persistProxy(proxyState) + } return nil } @@ -2135,7 +2180,9 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error { return err } - // TODO(banks): unpersist proxy + if persist && !a.config.DevMode { + return a.purgeProxy(proxyID) + } return nil } @@ -2615,13 +2662,71 @@ func (a *Agent) loadProxies(conf *config.RuntimeConfig) error { if proxy == nil { continue } - if err := a.AddProxy(proxy, false); err != nil { + if err := a.AddProxy(proxy, false, ""); err != nil { return fmt.Errorf("failed adding proxy: %s", err) } } } - // TODO(banks): persist proxy state and re-load it here? + // Load any persisted proxies + proxyDir := filepath.Join(a.config.DataDir, proxyDir) + files, err := ioutil.ReadDir(proxyDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err) + } + for _, fi := range files { + // Skip all dirs + if fi.IsDir() { + continue + } + + // Skip all partially written temporary files + if strings.HasSuffix(fi.Name(), "tmp") { + a.logger.Printf("[WARN] agent: Ignoring temporary proxy file %v", fi.Name()) + continue + } + + // Open the file for reading + file := filepath.Join(proxyDir, fi.Name()) + fh, err := os.Open(file) + if err != nil { + return fmt.Errorf("failed opening proxy file %q: %s", file, err) + } + + // Read the contents into a buffer + buf, err := ioutil.ReadAll(fh) + fh.Close() + if err != nil { + return fmt.Errorf("failed reading proxy file %q: %s", file, err) + } + + // Try decoding the proxy definition + var p persistedProxy + if err := json.Unmarshal(buf, &p); err != nil { + a.logger.Printf("[ERR] agent: Failed decoding proxy file %q: %s", file, err) + continue + } + proxyID := p.Proxy.ProxyService.ID + + if a.State.Proxy(proxyID) != nil { + // Purge previously persisted proxy. This allows config to be preferred + // over services persisted from the API. + a.logger.Printf("[DEBUG] agent: proxy %q exists, not restoring from %q", + proxyID, file) + if err := a.purgeProxy(proxyID); err != nil { + return fmt.Errorf("failed purging proxy %q: %s", proxyID, err) + } + } else { + a.logger.Printf("[DEBUG] agent: restored proxy definition %q from %q", + proxyID, file) + if err := a.AddProxy(p.Proxy, false, p.ProxyToken); err != nil { + return fmt.Errorf("failed adding proxy %q: %s", proxyID, err) + } + } + } return nil } diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 1318e8f86e..6c81e56375 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -629,7 +629,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re } // Add proxy (which will add proxy service so do it before we trigger sync) if proxy != nil { - if err := s.agent.AddProxy(proxy, true); err != nil { + if err := s.agent.AddProxy(proxy, true, ""); err != nil { return nil, err } } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index adef1b2373..3adda1db46 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -73,7 +73,7 @@ func TestAgent_Services(t *testing.T) { }, TargetServiceID: "mysql", } - _, err := a.State.AddProxy(prxy1, "") + _, err := a.State.AddProxy(prxy1, "", "") require.NoError(t, err) req, _ := http.NewRequest("GET", "/v1/agent/services", nil) diff --git a/agent/agent_test.go b/agent/agent_test.go index 993bf3b25e..fac16fba5c 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/consul/types" uuid "github.com/hashicorp/go-uuid" "github.com/pascaldekloe/goe/verify" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1350,6 +1351,187 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { } } +func TestAgent_PersistProxy(t *testing.T) { + t.Parallel() + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + cfg := ` + server = false + bootstrap = false + data_dir = "` + dataDir + `" + ` + a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir} + a.Start() + defer os.RemoveAll(dataDir) + defer a.Shutdown() + + require := require.New(t) + assert := assert.New(t) + + // Add a service to proxy (precondition for AddProxy) + svc1 := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + require.NoError(a.AddService(svc1, nil, true, "")) + + // Add a proxy for it + proxy := &structs.ConnectManagedProxy{ + TargetServiceID: svc1.ID, + Command: []string{"/bin/sleep", "3600"}, + } + + file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy")) + + // Proxy is not persisted unless requested + require.NoError(a.AddProxy(proxy, false, "")) + _, err := os.Stat(file) + require.Error(err, "proxy should not be persisted") + + // Proxy is persisted if requested + require.NoError(a.AddProxy(proxy, true, "")) + _, err = os.Stat(file) + require.NoError(err, "proxy should be persisted") + + content, err := ioutil.ReadFile(file) + require.NoError(err) + + var gotProxy persistedProxy + require.NoError(json.Unmarshal(content, &gotProxy)) + assert.Equal(proxy.Command, gotProxy.Proxy.Command) + assert.Len(gotProxy.ProxyToken, 36) // sanity check for UUID + + // Updates service definition on disk + proxy.Config = map[string]interface{}{ + "foo": "bar", + } + require.NoError(a.AddProxy(proxy, true, "")) + + content, err = ioutil.ReadFile(file) + require.NoError(err) + + require.NoError(json.Unmarshal(content, &gotProxy)) + assert.Equal(gotProxy.Proxy.Command, proxy.Command) + assert.Equal(gotProxy.Proxy.Config, proxy.Config) + assert.Len(gotProxy.ProxyToken, 36) // sanity check for UUID + + a.Shutdown() + + // Should load it back during later start + a2 := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir} + a2.Start() + defer a2.Shutdown() + + restored := a2.State.Proxy("redis-proxy") + require.NotNil(restored) + assert.Equal(gotProxy.ProxyToken, restored.ProxyToken) + // Ensure the port that was auto picked at random is the same again + assert.Equal(gotProxy.Proxy.ProxyService.Port, restored.Proxy.ProxyService.Port) + assert.Equal(gotProxy.Proxy.Command, restored.Proxy.Command) +} + +func TestAgent_PurgeProxy(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + require := require.New(t) + + // Add a service to proxy (precondition for AddProxy) + svc1 := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + require.NoError(a.AddService(svc1, nil, true, "")) + + // Add a proxy for it + proxy := &structs.ConnectManagedProxy{ + TargetServiceID: svc1.ID, + Command: []string{"/bin/sleep", "3600"}, + } + proxyID := "redis-proxy" + require.NoError(a.AddProxy(proxy, true, "")) + + file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy")) + + // Not removed + require.NoError(a.RemoveProxy(proxyID, false)) + _, err := os.Stat(file) + require.NoError(err, "should not be removed") + + // Re-add the proxy + require.NoError(a.AddProxy(proxy, true, "")) + + // Removed + require.NoError(a.RemoveProxy(proxyID, true)) + _, err = os.Stat(file) + require.Error(err, "should be removed") +} + +func TestAgent_PurgeProxyOnDuplicate(t *testing.T) { + t.Parallel() + dataDir := testutil.TempDir(t, "agent") // we manage the data dir + cfg := ` + data_dir = "` + dataDir + `" + server = false + bootstrap = false + ` + a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir} + a.Start() + defer a.Shutdown() + defer os.RemoveAll(dataDir) + + require := require.New(t) + + // Add a service to proxy (precondition for AddProxy) + svc1 := &structs.NodeService{ + ID: "redis", + Service: "redis", + Tags: []string{"foo"}, + Port: 8000, + } + require.NoError(a.AddService(svc1, nil, true, "")) + + // Add a proxy for it + proxy := &structs.ConnectManagedProxy{ + TargetServiceID: svc1.ID, + Command: []string{"/bin/sleep", "3600"}, + } + proxyID := "redis-proxy" + require.NoError(a.AddProxy(proxy, true, "")) + + a.Shutdown() + + // Try bringing the agent back up with the service already + // existing in the config + a2 := &TestAgent{Name: t.Name() + "-a2", HCL: cfg + ` + service = { + id = "redis" + name = "redis" + tags = ["bar"] + port = 9000 + connect { + proxy { + command = ["/bin/sleep", "3600"] + } + } + } + `, DataDir: dataDir} + a2.Start() + defer a2.Shutdown() + + file := filepath.Join(a.Config.DataDir, proxyDir, stringHash(proxyID)) + _, err := os.Stat(file) + require.Error(err, "should have removed remote state") + + result := a2.State.Proxy(proxyID) + require.NotNil(result) + require.Equal(proxy.Command, result.Proxy.Command) +} + func TestAgent_PersistCheck(t *testing.T) { t.Parallel() dataDir := testutil.TempDir(t, "agent") // we manage the data dir @@ -2482,7 +2664,7 @@ func TestAgent_AddProxy(t *testing.T) { t.Run(tt.desc, func(t *testing.T) { require := require.New(t) - err := a.AddProxy(tt.proxy, false) + err := a.AddProxy(tt.proxy, false, "") if tt.wantErr { require.Error(err) return @@ -2522,7 +2704,7 @@ func TestAgent_RemoveProxy(t *testing.T) { ExecMode: structs.ProxyExecModeDaemon, Command: []string{"foo"}, } - require.NoError(a.AddProxy(pReg, false)) + require.NoError(a.AddProxy(pReg, false, "")) // Test the ID was created as we expect. gotProxy := a.State.Proxy("web-proxy") diff --git a/agent/local/state.go b/agent/local/state.go index 22d654af4d..7612bd86ae 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -118,12 +118,19 @@ type ManagedProxy struct { // ProxyToken is a special local-only security token that grants the bearer // access to the proxy's config as well as allowing it to request certificates - // on behalf of the TargetService. Certain connect endpoints will validate - // against this token and if it matches will then use the TargetService.Token - // to actually authenticate the upstream RPC on behalf of the service. This - // token is passed securely to the proxy process via ENV vars and should never - // be exposed any other way. Unmanaged proxies will never see this and need to - // use service-scoped ACL tokens distributed externally. + // on behalf of the target service. Certain connect endpoints will validate + // against this token and if it matches will then use the target service's + // registration token to actually authenticate the upstream RPC on behalf of + // the service. This token is passed securely to the proxy process via ENV + // vars and should never be exposed any other way. Unmanaged proxies will + // never see this and need to use service-scoped ACL tokens distributed + // externally. It is persisted in the local state to allow authenticating + // running proxies after the agent restarts. + // + // TODO(banks): In theory we only need to persist this at all to _validate_ + // which means we could keep only a hash in memory and on disk and only pass + // the actual token to the process on startup. That would require a bit of + // refactoring though to have the required interaction with the proxy manager. ProxyToken string // WatchCh is a close-only chan that is closed when the proxy is removed or @@ -574,7 +581,13 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { // (since that has to do other book keeping). The token passed here is the ACL // token the service used to register itself so must have write on service // record. AddProxy returns the newly added proxy and an error. -func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, error) { +// +// The restoredProxyToken argument should only be used when restoring proxy +// definitions from disk; new proxies must leave it blank to get a new token +// assigned. We need to restore from disk to enable to continue authenticating +// running proxies that already had that credential injected. +func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token, + restoredProxyToken string) (*ManagedProxy, error) { if proxy == nil { return nil, fmt.Errorf("no proxy") } @@ -603,19 +616,35 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man Port: cfg.BindPort, } - pToken, err := uuid.GenerateUUID() - if err != nil { - return nil, err - } - // Lock now. We can't lock earlier as l.Service would deadlock and shouldn't // anyway to minimise the critical section. l.Lock() defer l.Unlock() + pToken := restoredProxyToken + // Does this proxy instance allready exist? if existing, ok := l.managedProxies[svc.ID]; ok { - svc.Port = existing.Proxy.ProxyService.Port + // Keep the existing proxy token so we don't have to restart proxy to + // re-inject token. + pToken = existing.ProxyToken + // If the user didn't explicitly change the port, use the old one instead of + // assigning new. + if svc.Port < 1 { + svc.Port = existing.Proxy.ProxyService.Port + } + } else if proxyService, ok := l.services[svc.ID]; ok { + // The proxy-service already exists so keep the port that got assigned. This + // happens on reload from disk since service definitions are reloaded first. + svc.Port = proxyService.Service.Port + } + + // If this is a new instance, generate a token + if pToken == "" { + pToken, err = uuid.GenerateUUID() + if err != nil { + return nil, err + } } // Allocate port if needed (min and max inclusive).