Merge pull request #8461 from hashicorp/dnephin/remove-notify-shutdown

agent/consul: Remove NotifyShutdown
This commit is contained in:
Daniel Nephin 2020-08-13 11:16:48 -04:00
parent 81de78d131
commit 533c53b8ef
6 changed files with 45 additions and 107 deletions

View File

@ -401,7 +401,6 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
func TestAutopilot_MinQuorum(t *testing.T) { func TestAutopilot_MinQuorum(t *testing.T) {
dc := "dc1" dc := "dc1"
closeMap := make(map[string]chan struct{})
conf := func(c *Config) { conf := func(c *Config) {
c.Datacenter = dc c.Datacenter = dc
c.Bootstrap = false c.Bootstrap = false
@ -409,13 +408,6 @@ func TestAutopilot_MinQuorum(t *testing.T) {
c.AutopilotConfig.MinQuorum = 3 c.AutopilotConfig.MinQuorum = 3
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(2) c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(2)
c.AutopilotInterval = 100 * time.Millisecond c.AutopilotInterval = 100 * time.Millisecond
//Let us know when a server is actually gone
ch := make(chan struct{})
c.NotifyShutdown = func() {
t.Logf("%v is shutdown", c.NodeName)
close(ch)
}
closeMap[c.NodeName] = ch
} }
dir1, s1 := testServerWithConfig(t, conf) dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -463,8 +455,7 @@ func TestAutopilot_MinQuorum(t *testing.T) {
if dead == nil { if dead == nil {
t.Fatalf("no members set") t.Fatalf("no members set")
} }
dead.Shutdown() require.NoError(t, dead.Shutdown())
<-closeMap[dead.config.NodeName]
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
leader := findStatus(true) leader := findStatus(true)
if leader == nil { if leader == nil {
@ -480,10 +471,7 @@ func TestAutopilot_MinQuorum(t *testing.T) {
delete(servers, dead.config.NodeName) delete(servers, dead.config.NodeName)
//Autopilot should not take this one into left //Autopilot should not take this one into left
dead = findStatus(false) dead = findStatus(false)
if err := dead.Shutdown(); err != nil { require.NoError(t, dead.Shutdown())
t.Fatalf("could not shut down %s, error %v", dead.config.NodeName, err)
}
<-closeMap[dead.config.NodeName]
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
leader := findStatus(true) leader := findStatus(true)
@ -496,5 +484,4 @@ func TestAutopilot_MinQuorum(t *testing.T) {
} }
} }
}) })
} }

View File

@ -23,18 +23,15 @@ import (
func testClientConfig(t *testing.T) (string, *Config) { func testClientConfig(t *testing.T) (string, *Config) {
dir := testutil.TempDir(t, "consul") dir := testutil.TempDir(t, "consul")
t.Cleanup(func() {
os.RemoveAll(dir)
})
config := DefaultConfig() config := DefaultConfig()
ports := freeport.MustTake(2) ports := freeport.MustTake(2)
t.Cleanup(func() {
returnPortsFn := func() {
// The method of plumbing this into the client shutdown hook doesn't
// cover all exit points, so we insulate this against multiple
// invocations and then it's safe to call it a bunch of times.
freeport.Return(ports) freeport.Return(ports)
config.NotifyShutdown = nil // self-erasing })
}
config.NotifyShutdown = returnPortsFn
config.Datacenter = "dc1" config.Datacenter = "dc1"
config.DataDir = dir config.DataDir = dir
@ -82,9 +79,6 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
} }
client, err := NewClient(config, WithLogger(logger), WithTLSConfigurator(tlsConf)) client, err := NewClient(config, WithLogger(logger), WithTLSConfigurator(tlsConf))
if err != nil {
config.NotifyShutdown()
}
return dir, client, err return dir, client, err
} }
@ -434,7 +428,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
func TestClient_RPC_TLS(t *testing.T) { func TestClient_RPC_TLS(t *testing.T) {
t.Parallel() t.Parallel()
dir1, conf1 := testServerConfig(t) _, conf1 := testServerConfig(t)
conf1.VerifyIncoming = true conf1.VerifyIncoming = true
conf1.VerifyOutgoing = true conf1.VerifyOutgoing = true
configureTLS(conf1) configureTLS(conf1)
@ -442,19 +436,12 @@ func TestClient_RPC_TLS(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
dir2, conf2 := testClientConfig(t) _, conf2 := testClientConfig(t)
defer conf2.NotifyShutdown()
conf2.VerifyOutgoing = true conf2.VerifyOutgoing = true
configureTLS(conf2) configureTLS(conf2)
c1, err := newClient(t, conf2) c1 := newClient(t, conf2)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try an RPC // Try an RPC
var out struct{} var out struct{}
@ -479,39 +466,38 @@ func TestClient_RPC_TLS(t *testing.T) {
}) })
} }
func newClient(t *testing.T, config *Config) (*Client, error) { func newClient(t *testing.T, config *Config) *Client {
t.Helper()
c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil) c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil)
if err != nil { require.NoError(t, err, "failed to create tls configuration")
return nil, err
}
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{ logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
Level: hclog.Debug, Level: hclog.Debug,
Output: testutil.NewLogBuffer(t), Output: testutil.NewLogBuffer(t),
}) })
return NewClient(config, WithLogger(logger), WithTLSConfigurator(c)) client, err := NewClient(config, WithLogger(logger), WithTLSConfigurator(c))
require.NoError(t, err, "failed to create client")
t.Cleanup(func() {
client.Shutdown()
})
return client
} }
func TestClient_RPC_RateLimit(t *testing.T) { func TestClient_RPC_RateLimit(t *testing.T) {
t.Parallel() t.Parallel()
dir1, conf1 := testServerConfig(t) _, conf1 := testServerConfig(t)
s1, err := newServer(t, conf1) s1, err := newServer(t, conf1)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, conf2 := testClientConfig(t) _, conf2 := testClientConfig(t)
defer conf2.NotifyShutdown()
conf2.RPCRate = 2 conf2.RPCRate = 2
conf2.RPCMaxBurst = 2 conf2.RPCMaxBurst = 2
c1, err := newClient(t, conf2) c1 := newClient(t, conf2)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir2)
defer c1.Shutdown()
joinLAN(t, c1, s1) joinLAN(t, c1, s1)
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -565,21 +551,14 @@ func TestClient_SnapshotRPC(t *testing.T) {
func TestClient_SnapshotRPC_RateLimit(t *testing.T) { func TestClient_SnapshotRPC_RateLimit(t *testing.T) {
t.Parallel() t.Parallel()
dir1, s1 := testServer(t) _, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, conf1 := testClientConfig(t) _, conf1 := testClientConfig(t)
defer conf1.NotifyShutdown()
conf1.RPCRate = 2 conf1.RPCRate = 2
conf1.RPCMaxBurst = 2 conf1.RPCMaxBurst = 2
c1, err := newClient(t, conf1) c1 := newClient(t, conf1)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir2)
defer c1.Shutdown()
joinLAN(t, c1, s1) joinLAN(t, c1, s1)
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -602,7 +581,7 @@ func TestClient_SnapshotRPC_RateLimit(t *testing.T) {
func TestClient_SnapshotRPC_TLS(t *testing.T) { func TestClient_SnapshotRPC_TLS(t *testing.T) {
t.Parallel() t.Parallel()
dir1, conf1 := testServerConfig(t) _, conf1 := testServerConfig(t)
conf1.VerifyIncoming = true conf1.VerifyIncoming = true
conf1.VerifyOutgoing = true conf1.VerifyOutgoing = true
configureTLS(conf1) configureTLS(conf1)
@ -610,19 +589,12 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
dir2, conf2 := testClientConfig(t) _, conf2 := testClientConfig(t)
defer conf2.NotifyShutdown()
conf2.VerifyOutgoing = true conf2.VerifyOutgoing = true
configureTLS(conf2) configureTLS(conf2)
c1, err := newClient(t, conf2) c1 := newClient(t, conf2)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Wait for the leader // Wait for the leader
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -119,9 +119,6 @@ type Config struct {
// configured at this point. // configured at this point.
NotifyListen func() NotifyListen func()
// NotifyShutdown is called after Server is completely Shutdown.
NotifyShutdown func()
// RPCAddr is the RPC address used by Consul. This should be reachable // RPCAddr is the RPC address used by Consul. This should be reachable
// by the WAN and LAN // by the WAN and LAN
RPCAddr *net.TCPAddr RPCAddr *net.TCPAddr

View File

@ -1286,8 +1286,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
} }
}() }()
dir, config := testServerConfig(t) _, config := testServerConfig(t)
defer os.RemoveAll(dir)
config.Build = "1.6.0" config.Build = "1.6.0"
config.ConfigEntryBootstrap = []structs.ConfigEntry{ config.ConfigEntryBootstrap = []structs.ConfigEntry{
&structs.ServiceSplitterConfigEntry{ &structs.ServiceSplitterConfigEntry{

View File

@ -933,8 +933,8 @@ func (s *Server) Shutdown() error {
s.acls.Close() s.acls.Close()
} }
if s.config.NotifyShutdown != nil { if s.fsm != nil {
s.config.NotifyShutdown() s.fsm.State().Abandon()
} }
return nil return nil

View File

@ -129,18 +129,15 @@ func waitForLeaderEstablishment(t *testing.T, servers ...*Server) {
func testServerConfig(t *testing.T) (string, *Config) { func testServerConfig(t *testing.T) (string, *Config) {
dir := testutil.TempDir(t, "consul") dir := testutil.TempDir(t, "consul")
t.Cleanup(func() {
os.RemoveAll(dir)
})
config := DefaultConfig() config := DefaultConfig()
ports := freeport.MustTake(3) ports := freeport.MustTake(3)
t.Cleanup(func() {
returnPortsFn := func() {
// The method of plumbing this into the server shutdown hook doesn't
// cover all exit points, so we insulate this against multiple
// invocations and then it's safe to call it a bunch of times.
freeport.Return(ports) freeport.Return(ports)
config.NotifyShutdown = nil // self-erasing })
}
config.NotifyShutdown = returnPortsFn
config.NodeName = uniqueNodeName(t.Name()) config.NodeName = uniqueNodeName(t.Name())
config.Bootstrap = true config.Bootstrap = true
@ -154,7 +151,6 @@ func testServerConfig(t *testing.T) (string, *Config) {
nodeID, err := uuid.GenerateUUID() nodeID, err := uuid.GenerateUUID()
if err != nil { if err != nil {
returnPortsFn()
t.Fatal(err) t.Fatal(err)
} }
config.NodeID = types.NodeID(nodeID) config.NodeID = types.NodeID(nodeID)
@ -211,9 +207,6 @@ func testServerConfig(t *testing.T) (string, *Config) {
"IntermediateCertTTL": "288h", "IntermediateCertTTL": "288h",
}, },
} }
config.NotifyShutdown = returnPortsFn
return dir, config return dir, config
} }
@ -270,8 +263,6 @@ func testServerWithConfig(t *testing.T, cb func(*Config)) (string, *Server) {
var err error var err error
srv, err = newServer(t, config) srv, err = newServer(t, config)
if err != nil { if err != nil {
config.NotifyShutdown()
os.RemoveAll(dir)
r.Fatalf("err: %v", err) r.Fatalf("err: %v", err)
} }
}) })
@ -281,7 +272,6 @@ func testServerWithConfig(t *testing.T, cb func(*Config)) (string, *Server) {
// cb is a function that can alter the test servers configuration prior to the server starting. // cb is a function that can alter the test servers configuration prior to the server starting.
func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToken bool) (string, *Server, rpc.ClientCodec) { func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToken bool) (string, *Server, rpc.ClientCodec) {
dir, srv := testServerWithConfig(t, testServerACLConfig(cb)) dir, srv := testServerWithConfig(t, testServerACLConfig(cb))
t.Cleanup(func() { os.RemoveAll(dir) })
t.Cleanup(func() { srv.Shutdown() }) t.Cleanup(func() { srv.Shutdown() })
if initReplicationToken { if initReplicationToken {
@ -342,8 +332,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) {
func TestServer_StartStop(t *testing.T) { func TestServer_StartStop(t *testing.T) {
t.Parallel() t.Parallel()
// Start up a server and then stop it. // Start up a server and then stop it.
dir1, s1 := testServer(t) _, s1 := testServer(t)
defer os.RemoveAll(dir1)
if err := s1.Shutdown(); err != nil { if err := s1.Shutdown(); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -357,20 +346,18 @@ func TestServer_StartStop(t *testing.T) {
func TestServer_fixupACLDatacenter(t *testing.T) { func TestServer_fixupACLDatacenter(t *testing.T) {
t.Parallel() t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) { _, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "aye" c.Datacenter = "aye"
c.PrimaryDatacenter = "aye" c.PrimaryDatacenter = "aye"
c.ACLsEnabled = true c.ACLsEnabled = true
}) })
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) { _, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "bee" c.Datacenter = "bee"
c.PrimaryDatacenter = "aye" c.PrimaryDatacenter = "aye"
c.ACLsEnabled = true c.ACLsEnabled = true
}) })
defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
// Try to join // Try to join
@ -1081,7 +1068,7 @@ func TestServer_RPC(t *testing.T) {
func TestServer_JoinLAN_TLS(t *testing.T) { func TestServer_JoinLAN_TLS(t *testing.T) {
t.Parallel() t.Parallel()
dir1, conf1 := testServerConfig(t) _, conf1 := testServerConfig(t)
conf1.VerifyIncoming = true conf1.VerifyIncoming = true
conf1.VerifyOutgoing = true conf1.VerifyOutgoing = true
configureTLS(conf1) configureTLS(conf1)
@ -1089,11 +1076,10 @@ func TestServer_JoinLAN_TLS(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1") testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
dir2, conf2 := testServerConfig(t) _, conf2 := testServerConfig(t)
conf2.Bootstrap = false conf2.Bootstrap = false
conf2.VerifyIncoming = true conf2.VerifyIncoming = true
conf2.VerifyOutgoing = true conf2.VerifyOutgoing = true
@ -1102,7 +1088,6 @@ func TestServer_JoinLAN_TLS(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
// Try to join // Try to join
@ -1480,14 +1465,13 @@ func TestServer_Reload(t *testing.T) {
func TestServer_RPC_RateLimit(t *testing.T) { func TestServer_RPC_RateLimit(t *testing.T) {
t.Parallel() t.Parallel()
dir1, conf1 := testServerConfig(t) _, conf1 := testServerConfig(t)
conf1.RPCRate = 2 conf1.RPCRate = 2
conf1.RPCMaxBurst = 2 conf1.RPCMaxBurst = 2
s1, err := newServer(t, conf1) s1, err := newServer(t, conf1)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -1501,7 +1485,7 @@ func TestServer_RPC_RateLimit(t *testing.T) {
func TestServer_CALogging(t *testing.T) { func TestServer_CALogging(t *testing.T) {
t.Parallel() t.Parallel()
dir1, conf1 := testServerConfig(t) _, conf1 := testServerConfig(t)
// Setup dummy logger to catch output // Setup dummy logger to catch output
var buf bytes.Buffer var buf bytes.Buffer
@ -1517,7 +1501,6 @@ func TestServer_CALogging(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")