mirror of https://github.com/status-im/consul.git
agent: start proxy manager
This commit is contained in:
parent
7879e1d2ef
commit
1a2b28602c
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/local"
|
"github.com/hashicorp/consul/agent/local"
|
||||||
|
"github.com/hashicorp/consul/agent/proxy"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/systemd"
|
"github.com/hashicorp/consul/agent/systemd"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
|
@ -200,6 +201,9 @@ type Agent struct {
|
||||||
// be updated at runtime, so should always be used instead of going to
|
// be updated at runtime, so should always be used instead of going to
|
||||||
// the configuration directly.
|
// the configuration directly.
|
||||||
tokens *token.Store
|
tokens *token.Store
|
||||||
|
|
||||||
|
// proxyManager is the proxy process manager for managed Connect proxies.
|
||||||
|
proxyManager *proxy.Manager
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(c *config.RuntimeConfig) (*Agent, error) {
|
func New(c *config.RuntimeConfig) (*Agent, error) {
|
||||||
|
@ -353,6 +357,14 @@ func (a *Agent) Start() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create the proxy process manager and start it. This is purposely
|
||||||
|
// done here after the local state above is loaded in so we can have
|
||||||
|
// a more accurate initial state view.
|
||||||
|
a.proxyManager = proxy.NewManager()
|
||||||
|
a.proxyManager.State = a.State
|
||||||
|
a.proxyManager.Logger = a.logger
|
||||||
|
go a.proxyManager.Run()
|
||||||
|
|
||||||
// Start watching for critical services to deregister, based on their
|
// Start watching for critical services to deregister, based on their
|
||||||
// checks.
|
// checks.
|
||||||
go a.reapServices()
|
go a.reapServices()
|
||||||
|
@ -1269,9 +1281,11 @@ func (a *Agent) ShutdownAgent() error {
|
||||||
chk.Stop()
|
chk.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unload all our proxies so that we stop the running processes.
|
// Stop the proxy manager
|
||||||
if err := a.unloadProxies(); err != nil {
|
// NOTE(mitchellh): we use Kill for now to kill the processes since
|
||||||
a.logger.Printf("[WARN] agent: error stopping managed proxies: %s", err)
|
// snapshotting isn't implemented. This should change to Close later.
|
||||||
|
if err := a.proxyManager.Kill(); err != nil {
|
||||||
|
a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -2038,7 +2052,6 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error
|
||||||
// Lookup the target service token in state if there is one.
|
// Lookup the target service token in state if there is one.
|
||||||
token := a.State.ServiceToken(proxy.TargetServiceID)
|
token := a.State.ServiceToken(proxy.TargetServiceID)
|
||||||
|
|
||||||
/*
|
|
||||||
// Determine if we need to default the command
|
// Determine if we need to default the command
|
||||||
if proxy.ExecMode == structs.ProxyExecModeDaemon && len(proxy.Command) == 0 {
|
if proxy.ExecMode == structs.ProxyExecModeDaemon && len(proxy.Command) == 0 {
|
||||||
// We use the globally configured default command. If it is empty
|
// We use the globally configured default command. If it is empty
|
||||||
|
@ -2054,7 +2067,6 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error
|
||||||
|
|
||||||
proxy.CommandDefault = cmd
|
proxy.CommandDefault = cmd
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
// Add the proxy to local state first since we may need to assign a port which
|
// Add the proxy to local state first since we may need to assign a port which
|
||||||
// needs to be coordinate under state lock. AddProxy will generate the
|
// needs to be coordinate under state lock. AddProxy will generate the
|
||||||
|
|
|
@ -70,7 +70,7 @@ func TestAgent_Services(t *testing.T) {
|
||||||
},
|
},
|
||||||
TargetServiceID: "mysql",
|
TargetServiceID: "mysql",
|
||||||
}
|
}
|
||||||
_, _, err := a.State.AddProxy(prxy1, "")
|
_, err := a.State.AddProxy(prxy1, "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
|
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,9 +147,15 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := process.Wait()
|
ps, err := process.Wait()
|
||||||
process = nil
|
process = nil
|
||||||
p.Logger.Printf("[INFO] agent/proxy: daemon exited: %s", err)
|
if err != nil {
|
||||||
|
p.Logger.Printf("[INFO] agent/proxy: daemon exited with error: %s", err)
|
||||||
|
} else if status, ok := ps.Sys().(syscall.WaitStatus); ok {
|
||||||
|
p.Logger.Printf(
|
||||||
|
"[INFO] agent/proxy: daemon exited with exit code: %d",
|
||||||
|
status.ExitStatus())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +172,12 @@ func (p *Daemon) start() (*os.Process, error) {
|
||||||
copy(cmd.Env, p.Command.Env)
|
copy(cmd.Env, p.Command.Env)
|
||||||
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
|
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
|
||||||
|
|
||||||
|
// TODO(mitchellh): temporary until we introduce the file based logging
|
||||||
|
cmd.Stdout = os.Stdout
|
||||||
|
cmd.Stderr = os.Stderr
|
||||||
|
|
||||||
// Start it
|
// Start it
|
||||||
|
p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", cmd.Path, cmd.Args)
|
||||||
err := cmd.Start()
|
err := cmd.Start()
|
||||||
return cmd.Process, err
|
return cmd.Process, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -192,6 +192,7 @@ func (m *Manager) Run() {
|
||||||
m.State.NotifyProxy(notifyCh)
|
m.State.NotifyProxy(notifyCh)
|
||||||
defer m.State.StopNotifyProxy(notifyCh)
|
defer m.State.StopNotifyProxy(notifyCh)
|
||||||
|
|
||||||
|
m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started")
|
||||||
for {
|
for {
|
||||||
// Sync first, before waiting on further notifications so that
|
// Sync first, before waiting on further notifications so that
|
||||||
// we can start with a known-current state.
|
// we can start with a known-current state.
|
||||||
|
@ -203,6 +204,7 @@ func (m *Manager) Run() {
|
||||||
|
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
// Stop immediately, no cleanup
|
// Stop immediately, no cleanup
|
||||||
|
m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -298,7 +300,7 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
|
||||||
// Build the command to execute.
|
// Build the command to execute.
|
||||||
var cmd exec.Cmd
|
var cmd exec.Cmd
|
||||||
cmd.Path = command[0]
|
cmd.Path = command[0]
|
||||||
cmd.Args = command[1:]
|
cmd.Args = command // idx 0 is path but preserved since it should be
|
||||||
|
|
||||||
// Build the daemon structure
|
// Build the daemon structure
|
||||||
return &Daemon{
|
return &Daemon{
|
||||||
|
|
|
@ -49,6 +49,8 @@ func (m ProxyExecMode) String() string {
|
||||||
return "daemon"
|
return "daemon"
|
||||||
case ProxyExecModeScript:
|
case ProxyExecModeScript:
|
||||||
return "script"
|
return "script"
|
||||||
|
case ProxyExecModeTest:
|
||||||
|
return "test"
|
||||||
default:
|
default:
|
||||||
return "unknown"
|
return "unknown"
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue