mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
Make tests pass and clean proxy persistence. No detached child changes yet.
This is a good state for persistence stuff to re-start the detached child work that got mixed up last time.
This commit is contained in:
parent
cdc7cfaa36
commit
df2cb30b01
@ -15,8 +15,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -1691,7 +1691,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
require := require.New(t)
|
||||
assert := assert.New(t)
|
||||
|
||||
_, err := state.AddProxy(&p1, "fake-token")
|
||||
_, err := state.AddProxy(&p1, "fake-token", "")
|
||||
require.Error(err, "should fail as the target service isn't registered")
|
||||
|
||||
// Sanity check done, lets add a couple of target services to the state
|
||||
@ -1710,7 +1710,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
require.NoError(err)
|
||||
|
||||
// Should work now
|
||||
pstate, err := state.AddProxy(&p1, "fake-token")
|
||||
pstate, err := state.AddProxy(&p1, "fake-token", "")
|
||||
require.NoError(err)
|
||||
|
||||
svc := pstate.Proxy.ProxyService
|
||||
@ -1724,8 +1724,9 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
|
||||
{
|
||||
// Re-registering same proxy again should not pick a random port but re-use
|
||||
// the assigned one.
|
||||
pstateDup, err := state.AddProxy(&p1, "fake-token")
|
||||
// the assigned one. It should also keep the same proxy token since we don't
|
||||
// want to force restart for config change.
|
||||
pstateDup, err := state.AddProxy(&p1, "fake-token", "")
|
||||
require.NoError(err)
|
||||
svcDup := pstateDup.Proxy.ProxyService
|
||||
|
||||
@ -1736,6 +1737,8 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
assert.Equal("", svcDup.Address, "should have empty address by default")
|
||||
// Port must be same as before
|
||||
assert.Equal(svc.Port, svcDup.Port)
|
||||
// Same ProxyToken
|
||||
assert.Equal(pstate.ProxyToken, pstateDup.ProxyToken)
|
||||
}
|
||||
|
||||
// Let's register a notifier now
|
||||
@ -1748,7 +1751,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
// Second proxy should claim other port
|
||||
p2 := p1
|
||||
p2.TargetServiceID = "cache"
|
||||
pstate2, err := state.AddProxy(&p2, "fake-token")
|
||||
pstate2, err := state.AddProxy(&p2, "fake-token", "")
|
||||
require.NoError(err)
|
||||
svc2 := pstate2.Proxy.ProxyService
|
||||
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||
@ -1764,7 +1767,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
// Third proxy should fail as all ports are used
|
||||
p3 := p1
|
||||
p3.TargetServiceID = "db"
|
||||
_, err = state.AddProxy(&p3, "fake-token")
|
||||
_, err = state.AddProxy(&p3, "fake-token", "")
|
||||
require.Error(err)
|
||||
|
||||
// Should have a notification but we'll do nothing so that the next
|
||||
@ -1775,7 +1778,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
"bind_port": 1234,
|
||||
"bind_address": "0.0.0.0",
|
||||
}
|
||||
pstate3, err := state.AddProxy(&p3, "fake-token")
|
||||
pstate3, err := state.AddProxy(&p3, "fake-token", "")
|
||||
require.NoError(err)
|
||||
svc3 := pstate3.Proxy.ProxyService
|
||||
require.Equal("0.0.0.0", svc3.Address)
|
||||
@ -1793,7 +1796,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
require.NotNil(gotP3)
|
||||
var ws memdb.WatchSet
|
||||
ws.Add(gotP3.WatchCh)
|
||||
pstate3, err = state.AddProxy(&p3updated, "fake-token")
|
||||
pstate3, err = state.AddProxy(&p3updated, "fake-token", "")
|
||||
require.NoError(err)
|
||||
svc3 = pstate3.Proxy.ProxyService
|
||||
require.Equal("0.0.0.0", svc3.Address)
|
||||
@ -1817,7 +1820,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
// Should be able to create a new proxy for that service with the port (it
|
||||
// should have been "freed").
|
||||
p4 := p2
|
||||
pstate4, err := state.AddProxy(&p4, "fake-token")
|
||||
pstate4, err := state.AddProxy(&p4, "fake-token", "")
|
||||
require.NoError(err)
|
||||
svc4 := pstate4.Proxy.ProxyService
|
||||
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||
@ -1855,6 +1858,68 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the logic for retaining tokens and ports through restore (i.e.
|
||||
// proxy-service already restored and token passed in externally)
|
||||
func TestStateProxyRestore(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
state := local.NewState(local.Config{
|
||||
// Wide random range to make it very unlikely to pass by chance
|
||||
ProxyBindMinPort: 10000,
|
||||
ProxyBindMaxPort: 20000,
|
||||
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
||||
|
||||
// Stub state syncing
|
||||
state.TriggerSyncChanges = func() {}
|
||||
|
||||
webSvc := structs.NodeService{
|
||||
Service: "web",
|
||||
}
|
||||
|
||||
p1 := structs.ConnectManagedProxy{
|
||||
ExecMode: structs.ProxyExecModeDaemon,
|
||||
Command: []string{"consul", "connect", "proxy"},
|
||||
TargetServiceID: "web",
|
||||
}
|
||||
|
||||
p2 := p1
|
||||
|
||||
require := require.New(t)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Add a target service
|
||||
require.NoError(state.AddService(&webSvc, "fake-token-web"))
|
||||
|
||||
// Add the proxy for first time to get the proper service definition to
|
||||
// register
|
||||
pstate, err := state.AddProxy(&p1, "fake-token", "")
|
||||
require.NoError(err)
|
||||
|
||||
// Now start again with a brand new state
|
||||
state2 := local.NewState(local.Config{
|
||||
// Wide random range to make it very unlikely to pass by chance
|
||||
ProxyBindMinPort: 10000,
|
||||
ProxyBindMaxPort: 20000,
|
||||
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
||||
|
||||
// Stub state syncing
|
||||
state2.TriggerSyncChanges = func() {}
|
||||
|
||||
// Register the target service
|
||||
require.NoError(state2.AddService(&webSvc, "fake-token-web"))
|
||||
|
||||
// "Restore" the proxy service
|
||||
require.NoError(state.AddService(p1.ProxyService, "fake-token-web"))
|
||||
|
||||
// Now we can AddProxy with the "restored" token
|
||||
pstate2, err := state.AddProxy(&p2, "fake-token", pstate.ProxyToken)
|
||||
require.NoError(err)
|
||||
|
||||
// Check it still has the same port and token as before
|
||||
assert.Equal(pstate.ProxyToken, pstate2.ProxyToken)
|
||||
assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port)
|
||||
}
|
||||
|
||||
// drainCh drains a channel by reading messages until it would block.
|
||||
func drainCh(ch chan struct{}) {
|
||||
for {
|
||||
|
@ -293,16 +293,28 @@ func (p *Daemon) Stop() error {
|
||||
|
||||
case <-time.After(gracefulWait):
|
||||
// Interrupt didn't work
|
||||
p.Logger.Printf("[DEBUG] agent/proxy: graceful wait of %s passed, "+
|
||||
"killing", gracefulWait)
|
||||
}
|
||||
} else if isProcessAlreadyFinishedErr(err) {
|
||||
// This can happen due to races between signals and polling.
|
||||
return nil
|
||||
} else {
|
||||
p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err)
|
||||
}
|
||||
|
||||
// Graceful didn't work, forcibly kill
|
||||
return process.Kill()
|
||||
// Graceful didn't work (e.g. on windows where SIGINT isn't implemented),
|
||||
// forcibly kill
|
||||
err = process.Kill()
|
||||
if err != nil && isProcessAlreadyFinishedErr(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// stopKeepAlive is like Stop but keeps the process running. This is
|
||||
// used only for tests.
|
||||
func (p *Daemon) stopKeepAlive() error {
|
||||
// Close implements Proxy by stopping the run loop but not killing the process.
|
||||
// One Close is called, Stop has no effect.
|
||||
func (p *Daemon) Close() error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
|
@ -398,7 +398,7 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) {
|
||||
snap := d.MarshalSnapshot()
|
||||
|
||||
// Stop the original daemon but keep it alive
|
||||
require.NoError(d.stopKeepAlive())
|
||||
require.NoError(d.Close())
|
||||
|
||||
// Restore the second daemon
|
||||
d2 := &Daemon{Logger: testLogger}
|
||||
|
@ -324,7 +324,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
|
||||
|
||||
// Add a second proxy so that we can determine when we're up
|
||||
// and running.
|
||||
path2 := filepath.Join(td, "file")
|
||||
path2 := filepath.Join(td, "file2")
|
||||
testStateProxy(t, state, "db", helperProcess("start-stop", path2))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path2)
|
||||
@ -343,7 +343,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
r.Fatalf("file still exists")
|
||||
r.Fatalf("file still exists: %s", path)
|
||||
})
|
||||
}
|
||||
|
||||
@ -379,7 +379,7 @@ func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.
|
||||
ExecMode: structs.ProxyExecModeDaemon,
|
||||
Command: command,
|
||||
TargetServiceID: service,
|
||||
}, "token")
|
||||
}, "token", "")
|
||||
require.NoError(t, err)
|
||||
|
||||
return p.Proxy.ProxyService.ID
|
||||
|
@ -5,6 +5,7 @@ type Noop struct{}
|
||||
|
||||
func (p *Noop) Start() error { return nil }
|
||||
func (p *Noop) Stop() error { return nil }
|
||||
func (p *Noop) Close() error { return nil }
|
||||
func (p *Noop) Equal(Proxy) bool { return true }
|
||||
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
|
||||
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }
|
||||
|
14
agent/proxy/process.go
Normal file
14
agent/proxy/process.go
Normal file
@ -0,0 +1,14 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// isProcessAlreadyFinishedErr does a janky comparison with an error string
|
||||
// defined in os/exec_unix.go and os/exec_windows.go which we encounter due to
|
||||
// races with polling the external process. These case tests to fail since Stop
|
||||
// returns an error sometimes so we should notice if this string stops matching
|
||||
// the error in a future go version.
|
||||
func isProcessAlreadyFinishedErr(err error) bool {
|
||||
return strings.Contains(err.Error(), "os: process already finished")
|
||||
}
|
@ -40,12 +40,18 @@ type Proxy interface {
|
||||
Start() error
|
||||
|
||||
// Stop stops the proxy and disallows it from ever being started again.
|
||||
// This should also clean up any resources used by this Proxy.
|
||||
//
|
||||
// If the proxy is not started yet, this should not return an error, but
|
||||
// it should disallow Start from working again. If the proxy is already
|
||||
// stopped, this should not return an error.
|
||||
Stop() error
|
||||
|
||||
// Close should clean up any resources associated with this proxy but
|
||||
// keep it running in the background. Only one of Close or Stop can be
|
||||
// called.
|
||||
Close() error
|
||||
|
||||
// Equal returns true if the argument is equal to the proxy being called.
|
||||
// This is called by the manager to determine if a change in configuration
|
||||
// results in a proxy that needs to be restarted or not. If Equal returns
|
||||
|
@ -1140,7 +1140,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
|
||||
Port: 8000,
|
||||
Connect: &AgentServiceConnect{
|
||||
Proxy: &AgentServiceConnectProxy{
|
||||
Command: []string{"consul connect proxy"},
|
||||
Command: []string{"consul", "connect", "proxy"},
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
@ -1157,7 +1157,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
|
||||
ProxyServiceID: "foo-proxy",
|
||||
TargetServiceID: "foo",
|
||||
TargetServiceName: "foo",
|
||||
ContentHash: "93baee1d838888ae",
|
||||
ContentHash: "2a29f8237db69d0e",
|
||||
ExecMode: "daemon",
|
||||
Command: []string{"consul", "connect", "proxy"},
|
||||
Config: map[string]interface{}{
|
||||
|
Loading…
x
Reference in New Issue
Block a user