From cec5d7239621e0732b3f70158addb1899442acb3 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Tue, 7 Aug 2018 01:46:09 +0200 Subject: [PATCH] BUGFIX: Unit test relying on WaitForLeader() did not work due to wrong test (#4472) - Improve resilience of testrpc.WaitForLeader() - Add additionall retry to CI - Increase "go test" timeout to 8m - Add wait for cluster leader to several tests in the agent package - Add retry to some tests in the api and command packages --- GNUmakefile | 15 +- agent/acl_endpoint_test.go | 12 + agent/acl_test.go | 16 ++ agent/agent_endpoint_test.go | 107 ++++++++- agent/catalog_endpoint_test.go | 10 +- agent/consul/acl_replication_test.go | 7 + agent/consul/acl_test.go | 3 + agent/consul/autopilot_test.go | 4 +- agent/consul/session_endpoint_test.go | 5 +- agent/dns_test.go | 71 ++++++ agent/health_endpoint_test.go | 16 +- agent/local/state_test.go | 25 +- agent/proxy/daemon_test.go | 8 +- agent/remote_exec_test.go | 142 +++++++----- agent/session_endpoint_test.go | 217 ++++++++++-------- api/catalog_test.go | 4 +- api/lock_test.go | 54 +++-- api/operator_autopilot_test.go | 64 +++--- .../list/nodes/catalog_list_nodes_test.go | 2 + command/connect/ca/set/connect_ca_set_test.go | 2 + command/exec/exec_test.go | 3 +- command/lock/lock_test.go | 19 ++ testrpc/wait.go | 4 +- 23 files changed, 564 insertions(+), 246 deletions(-) diff --git a/GNUmakefile b/GNUmakefile index 7ca17deae3..2f43a69bd7 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -162,7 +162,7 @@ test-internal: @# hide it from travis as it exceeds their log limits and causes job to be @# terminated (over 4MB and over 10k lines in the UI). We need to output @# _something_ to stop them terminating us due to inactivity... - { go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m $(GOTEST_PKGS) 2>&1 ; echo $$? > exit-code ; } | tee test.log | egrep '^(ok|FAIL)\s*github.com/hashicorp/consul' + { go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' $(GOTEST_PKGS) 2>&1 ; echo $$? > exit-code ; } | tee test.log | egrep '^(ok|FAIL|panic:|--- FAIL)' @echo "Exit code: $$(cat exit-code)" @# This prints all the race report between ====== lines @awk '/^WARNING: DATA RACE/ {do_print=1; print "=================="} do_print==1 {print} /^={10,}/ {do_print=0}' test.log || true @@ -182,12 +182,17 @@ test-race: # Run tests with config for CI so `make test` can still be local-dev friendly. test-ci: other-consul dev-build vet test-install-deps - @ if ! GOTEST_FLAGS="-p 3 -parallel 1" make test-internal; then \ + @ if ! GOTEST_FLAGS="-short -timeout 8m -p 3 -parallel 4" make test-internal; then \ echo " ============"; \ - echo " Retrying"; \ + echo " Retrying 1/2"; \ echo " ============"; \ - GOTEST_FLAGS="-p 5 -parallel 1" make test-internal; \ - fi + if ! GOTEST_FLAGS="-timeout 8m -p 1 -parallel 1" make test-internal; then \ + echo " ============"; \ + echo " Retrying 2/2"; \ + echo " ============"; \ + GOTEST_FLAGS="-timeout 9m -p 1 -parallel 1" make test-internal; \ + fi \ + fi other-consul: @echo "--> Checking for other consul instances" diff --git a/agent/acl_endpoint_test.go b/agent/acl_endpoint_test.go index aa15c48686..d4cf6da85a 100644 --- a/agent/acl_endpoint_test.go +++ b/agent/acl_endpoint_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" ) func TestACL_Disabled_Response(t *testing.T) { @@ -29,6 +30,7 @@ func TestACL_Disabled_Response(t *testing.T) { a.srv.ACLReplicationStatus, a.srv.AgentToken, // See TestAgent_Token. } + testrpc.WaitForLeader(t, a.RPC, "dc1") for i, tt := range tests { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { req, _ := http.NewRequest("PUT", "/should/not/care", nil) @@ -86,6 +88,7 @@ func TestACL_Bootstrap(t *testing.T) { {"bootstrap", "PUT", http.StatusOK, true}, {"not again", "PUT", http.StatusForbidden, false}, } + testrpc.WaitForLeader(t, a.RPC, "dc1") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resp := httptest.NewRecorder() @@ -119,6 +122,7 @@ func TestACL_Update(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestACL(t, a.srv) body := bytes.NewBuffer(nil) @@ -160,6 +164,8 @@ func TestACL_UpdateUpsert(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/acl/update?token=root", body) resp := httptest.NewRecorder() + + testrpc.WaitForLeader(t, a.RPC, "dc1") obj, err := a.srv.ACLUpdate(resp, req) if err != nil { t.Fatalf("err: %v", err) @@ -175,6 +181,7 @@ func TestACL_Destroy(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestACL(t, a.srv) req, _ := http.NewRequest("PUT", "/v1/acl/destroy/"+id+"?token=root", nil) resp := httptest.NewRecorder() @@ -206,6 +213,7 @@ func TestACL_Clone(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestACL(t, a.srv) req, _ := http.NewRequest("PUT", "/v1/acl/clone/"+id, nil) @@ -252,6 +260,7 @@ func TestACL_Get(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/acl/info/nope", nil) resp := httptest.NewRecorder() + testrpc.WaitForLeader(t, a.RPC, "dc1") obj, err := a.srv.ACLGet(resp, req) if err != nil { t.Fatalf("err: %v", err) @@ -269,6 +278,7 @@ func TestACL_Get(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestACL(t, a.srv) req, _ := http.NewRequest("GET", "/v1/acl/info/"+id, nil) @@ -292,6 +302,7 @@ func TestACL_List(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") var ids []string for i := 0; i < 10; i++ { ids = append(ids, makeTestACL(t, a.srv)) @@ -321,6 +332,7 @@ func TestACLReplicationStatus(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/acl/replication", nil) resp := httptest.NewRecorder() + testrpc.WaitForLeader(t, a.RPC, "dc1") obj, err := a.srv.ACLReplicationStatus(resp, req) if err != nil { t.Fatalf("err: %v", err) diff --git a/agent/acl_test.go b/agent/acl_test.go index 6b912646e3..8740f1278b 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -10,6 +10,7 @@ import ( rawacl "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" @@ -58,6 +59,7 @@ func TestACL_Version8(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{ getPolicyFn: func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { t.Fatalf("should not have called to server") @@ -79,6 +81,7 @@ func TestACL_Version8(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") var called bool m := MockServer{ getPolicyFn: func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { @@ -108,6 +111,7 @@ func TestACL_Disabled(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{ // Fetch a token without ACLs enabled and make sure the manager sees it. getPolicyFn: func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { @@ -162,6 +166,7 @@ func TestACL_Special_IDs(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{ // An empty ID should get mapped to the anonymous token. getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { @@ -220,6 +225,7 @@ func TestACL_Down_Deny(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{ // Resolve with ACLs down. getPolicyFn: func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { @@ -250,6 +256,7 @@ func TestACL_Down_Allow(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{ // Resolve with ACLs down. getPolicyFn: func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { @@ -282,6 +289,7 @@ func TestACL_Down_Extend(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{ // Populate the cache for one of the tokens. getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { @@ -360,6 +368,7 @@ func TestACL_Cache(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{ // Populate the cache for one of the tokens. getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { @@ -542,6 +551,7 @@ func TestACL_vetServiceRegister(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{catalogPolicy} if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err) @@ -587,6 +597,7 @@ func TestACL_vetServiceUpdate(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{catalogPolicy} if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err) @@ -622,6 +633,7 @@ func TestACL_vetCheckRegister(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{catalogPolicy} if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err) @@ -704,6 +716,7 @@ func TestACL_vetCheckUpdate(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{catalogPolicy} if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err) @@ -759,6 +772,7 @@ func TestACL_filterMembers(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{catalogPolicy} if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err) @@ -794,6 +808,7 @@ func TestACL_filterServices(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{catalogPolicy} if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err) @@ -824,6 +839,7 @@ func TestACL_filterChecks(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockServer{catalogPolicy} if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err) diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index f7e8535ab9..abaaacb355 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logger" + "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" @@ -53,6 +54,7 @@ func TestAgent_Services(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") srv1 := &structs.NodeService{ ID: "mysql", Service: "mysql", @@ -102,6 +104,7 @@ func TestAgent_Services_ExternalConnectProxy(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") srv1 := &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, ID: "db-proxy", @@ -126,6 +129,7 @@ func TestAgent_Services_ACLFilter(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") srv1 := &structs.NodeService{ ID: "mysql", Service: "mysql", @@ -164,6 +168,7 @@ func TestAgent_Checks(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk1 := &structs.HealthCheck{ Node: a.Config.NodeName, CheckID: "mysql", @@ -191,6 +196,7 @@ func TestAgent_Checks_ACLFilter(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk1 := &structs.HealthCheck{ Node: a.Config.NodeName, CheckID: "mysql", @@ -233,6 +239,7 @@ func TestAgent_Self(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/agent/self", nil) obj, err := a.srv.AgentSelf(nil, req) if err != nil { @@ -266,6 +273,7 @@ func TestAgent_Self_ACLDeny(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/agent/self", nil) if _, err := a.srv.AgentSelf(nil, req); !acl.IsErrPermissionDenied(err) { @@ -294,6 +302,7 @@ func TestAgent_Metrics_ACLDeny(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/agent/metrics", nil) if _, err := a.srv.AgentMetrics(nil, req); !acl.IsErrPermissionDenied(err) { @@ -319,6 +328,7 @@ func TestAgent_Metrics_ACLDeny(t *testing.T) { func TestAgent_Reload(t *testing.T) { t.Parallel() + dc1 := "dc1" a := NewTestAgent(t.Name(), ` acl_enforce_version_8 = false services = [ @@ -328,7 +338,7 @@ func TestAgent_Reload(t *testing.T) { ] watches = [ { - datacenter = "dc1" + datacenter = "`+dc1+`" type = "key" key = "test" handler = "true" @@ -341,6 +351,7 @@ func TestAgent_Reload(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, dc1) if a.State.Service("redis") == nil { t.Fatal("missing redis service") } @@ -393,6 +404,7 @@ func TestAgent_Reload_ACLDeny(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/agent/reload", nil) if _, err := a.srv.AgentReload(nil, req); !acl.IsErrPermissionDenied(err) { @@ -419,6 +431,7 @@ func TestAgent_Members(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/agent/members", nil) obj, err := a.srv.AgentMembers(nil, req) if err != nil { @@ -439,6 +452,7 @@ func TestAgent_Members_WAN(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/agent/members?wan=true", nil) obj, err := a.srv.AgentMembers(nil, req) if err != nil { @@ -459,6 +473,7 @@ func TestAgent_Members_ACLFilter(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/agent/members", nil) obj, err := a.srv.AgentMembers(nil, req) @@ -490,6 +505,8 @@ func TestAgent_Join(t *testing.T) { defer a1.Shutdown() a2 := NewTestAgent(t.Name(), "") defer a2.Shutdown() + testrpc.WaitForLeader(t, a1.RPC, "dc1") + testrpc.WaitForLeader(t, a2.RPC, "dc1") addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN) req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/agent/join/%s", addr), nil) @@ -518,6 +535,8 @@ func TestAgent_Join_WAN(t *testing.T) { defer a1.Shutdown() a2 := NewTestAgent(t.Name(), "") defer a2.Shutdown() + testrpc.WaitForLeader(t, a1.RPC, "dc1") + testrpc.WaitForLeader(t, a2.RPC, "dc1") addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortWAN) req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/agent/join/%s?wan=true", addr), nil) @@ -546,6 +565,8 @@ func TestAgent_Join_ACLDeny(t *testing.T) { defer a1.Shutdown() a2 := NewTestAgent(t.Name(), "") defer a2.Shutdown() + testrpc.WaitForLeader(t, a1.RPC, "dc1") + testrpc.WaitForLeader(t, a2.RPC, "dc1") addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN) @@ -584,6 +605,7 @@ func TestAgent_JoinLANNotify(t *testing.T) { t.Parallel() a1 := NewTestAgent(t.Name(), "") defer a1.Shutdown() + testrpc.WaitForLeader(t, a1.RPC, "dc1") a2 := NewTestAgent(t.Name(), ` server = false @@ -609,6 +631,7 @@ func TestAgent_Leave(t *testing.T) { t.Parallel() a1 := NewTestAgent(t.Name(), "") defer a1.Shutdown() + testrpc.WaitForLeader(t, a1.RPC, "dc1") a2 := NewTestAgent(t.Name(), ` server = false @@ -644,6 +667,7 @@ func TestAgent_Leave_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/agent/leave", nil) @@ -675,6 +699,8 @@ func TestAgent_ForceLeave(t *testing.T) { a1 := NewTestAgent(t.Name(), "") defer a1.Shutdown() a2 := NewTestAgent(t.Name(), "") + testrpc.WaitForLeader(t, a1.RPC, "dc1") + testrpc.WaitForLeader(t, a2.RPC, "dc1") // Join first addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN) @@ -685,6 +711,13 @@ func TestAgent_ForceLeave(t *testing.T) { // this test probably needs work a2.Shutdown() + // Wait for agent being marked as failed, so we wait for full shutdown of Agent + retry.Run(t, func(r *retry.R) { + m := a1.LANMembers() + if got, want := m[1].Status, serf.StatusFailed; got != want { + r.Fatalf("got status %q want %q", got, want) + } + }) // Force leave now req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/agent/force-leave/%s", a2.Config.NodeName), nil) @@ -708,6 +741,7 @@ func TestAgent_ForceLeave_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/agent/force-leave/nope", nil) @@ -736,6 +770,7 @@ func TestAgent_RegisterCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.CheckDefinition{ Name: "test", @@ -780,6 +815,7 @@ func TestAgent_RegisterCheck_Scripts(t *testing.T) { enable_script_checks = true `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") tests := []struct { name string @@ -862,6 +898,7 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.CheckDefinition{ Name: "test", @@ -897,6 +934,7 @@ func TestAgent_RegisterCheck_BadStatus(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.CheckDefinition{ Name: "test", @@ -917,6 +955,7 @@ func TestAgent_RegisterCheck_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.CheckDefinition{ Name: "test", @@ -942,6 +981,7 @@ func TestAgent_DeregisterCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} if err := a.AddCheck(chk, nil, false, ""); err != nil { @@ -967,6 +1007,7 @@ func TestAgent_DeregisterCheckACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} if err := a.AddCheck(chk, nil, false, ""); err != nil { @@ -992,6 +1033,7 @@ func TestAgent_PassCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1019,6 +1061,7 @@ func TestAgent_PassCheck_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1045,6 +1088,7 @@ func TestAgent_WarnCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1072,6 +1116,7 @@ func TestAgent_WarnCheck_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1098,6 +1143,7 @@ func TestAgent_FailCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1125,6 +1171,7 @@ func TestAgent_FailCheck_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1151,6 +1198,7 @@ func TestAgent_UpdateCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1234,6 +1282,7 @@ func TestAgent_UpdateCheck_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") chk := &structs.HealthCheck{Name: "test", CheckID: "test"} chkType := &structs.CheckType{TTL: 15 * time.Second} @@ -1262,6 +1311,7 @@ func TestAgent_RegisterService(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ServiceDefinition{ Name: "test", @@ -1318,6 +1368,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") json := `{"name":"test", "port":8000, "enable_tag_override": true, "meta": {"some": "meta"}}` req, _ := http.NewRequest("PUT", "/v1/agent/service/register", strings.NewReader(json)) @@ -1346,6 +1397,7 @@ func TestAgent_RegisterService_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ServiceDefinition{ Name: "test", @@ -1383,6 +1435,7 @@ func TestAgent_RegisterService_InvalidAddress(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") for _, addr := range []string{"0.0.0.0", "::", "[::]"} { t.Run("addr "+addr, func(t *testing.T) { @@ -1421,6 +1474,7 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a proxy. Note that the destination doesn't exist here on // this agent or in the catalog at all. This is intended and part @@ -1478,6 +1532,7 @@ func TestAgent_RegisterService_ManagedConnectProxy_Disabled(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a proxy. Note that the destination doesn't exist here on // this agent or in the catalog at all. This is intended and part @@ -1516,6 +1571,7 @@ func TestAgent_RegisterService_UnmanagedConnectProxy(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a proxy. Note that the destination doesn't exist here on // this agent or in the catalog at all. This is intended and part @@ -1555,6 +1611,7 @@ func TestAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ServiceDefinition{ Kind: structs.ServiceKindConnectProxy, @@ -1585,6 +1642,7 @@ func TestAgent_RegisterService_ConnectNative(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a proxy. Note that the destination doesn't exist here on // this agent or in the catalog at all. This is intended and part @@ -1616,6 +1674,7 @@ func TestAgent_DeregisterService(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") service := &structs.NodeService{ ID: "test", @@ -1648,6 +1707,7 @@ func TestAgent_DeregisterService_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") service := &structs.NodeService{ ID: "test", @@ -1684,6 +1744,7 @@ func TestAgent_DeregisterService_withManagedProxy(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -1739,6 +1800,7 @@ func TestAgent_DeregisterService_managedProxyDirect(t *testing.T) { `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -1778,6 +1840,7 @@ func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("not enabled", func(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/agent/service/maintenance/test", nil) @@ -1817,6 +1880,7 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service service := &structs.NodeService{ @@ -1859,6 +1923,7 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service service := &structs.NodeService{ @@ -1895,6 +1960,7 @@ func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service. service := &structs.NodeService{ @@ -1924,6 +1990,7 @@ func TestAgent_NodeMaintenance_BadRequest(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Fails when no enable flag provided req, _ := http.NewRequest("PUT", "/v1/agent/self/maintenance", nil) @@ -1940,6 +2007,7 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Force the node into maintenance mode req, _ := http.NewRequest("PUT", "/v1/agent/self/maintenance?enable=true&reason=broken&token=mytoken", nil) @@ -1972,6 +2040,7 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Force the node into maintenance mode a.EnableNodeMaintenance("", "") @@ -1996,6 +2065,7 @@ func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("no token", func(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/agent/self/maintenance?enable=true&reason=broken", nil) @@ -2016,6 +2086,7 @@ func TestAgent_RegisterCheck_Service(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ServiceDefinition{ Name: "memcache", @@ -2128,6 +2199,7 @@ func TestAgent_Monitor_ACLDeny(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Try without a token. req, _ := http.NewRequest("GET", "/v1/agent/monitor", nil) @@ -2153,6 +2225,7 @@ func TestAgent_Token(t *testing.T) { acl_agent_master_token = "" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") type tokens struct { user, agent, master, repl string @@ -2307,6 +2380,7 @@ func TestAgentConnectCARoots_empty(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), "connect { enabled = false }") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/roots", nil) resp := httptest.NewRecorder() @@ -2322,6 +2396,7 @@ func TestAgentConnectCARoots_list(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Set some CAs. Note that NewTestAgent already bootstraps one CA so this just // adds a second and makes it active. @@ -2398,6 +2473,7 @@ func TestAgentConnectCALeafCert_aclDefaultDeny(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -2434,6 +2510,7 @@ func TestAgentConnectCALeafCert_aclProxyToken(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -2479,6 +2556,7 @@ func TestAgentConnectCALeafCert_aclProxyTokenOther(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -2543,6 +2621,7 @@ func TestAgentConnectCALeafCert_aclServiceWrite(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -2600,6 +2679,7 @@ func TestAgentConnectCALeafCert_aclServiceReadDeny(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -2655,6 +2735,7 @@ func TestAgentConnectCALeafCert_good(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // CA already setup by default by NewTestAgent but force a new one so we can // verify it was signed easily. @@ -2756,6 +2837,7 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // CA already setup by default by NewTestAgent but force a new one so we can // verify it was signed easily. @@ -2870,6 +2952,7 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) { a := NewTestAgent(t.Name(), testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Define a local service with a managed proxy. It's registered in the test // loop to make sure agent state is predictable whatever order tests execute @@ -3072,6 +3155,7 @@ func TestAgentConnectProxyConfig_aclDefaultDeny(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -3107,6 +3191,7 @@ func TestAgentConnectProxyConfig_aclProxyToken(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -3154,6 +3239,7 @@ func TestAgentConnectProxyConfig_aclServiceWrite(t *testing.T) { require := require.New(t) a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { @@ -3213,6 +3299,7 @@ func TestAgentConnectProxyConfig_aclServiceReadDeny(t *testing.T) { a := NewTestAgent(t.Name(), TestACLConfig()+testAllowProxyConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service with a managed proxy { reg := &structs.ServiceDefinition{ @@ -3286,7 +3373,7 @@ func TestAgentConnectProxyConfig_ConfigHandling(t *testing.T) { TTL: 15 * time.Second, }, Connect: &structs.ServiceConnect{ - // Proxy is populated with the definition in the table below. + // Proxy is populated with the definition in the table below. }, } @@ -3577,6 +3664,7 @@ func TestAgentConnectProxyConfig_ConfigHandling(t *testing.T) { a := NewTestAgent(t.Name(), tt.globalConfig) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the basic service with the required config { @@ -3620,6 +3708,7 @@ func TestAgentConnectAuthorize_badBody(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := []string{} req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() @@ -3636,6 +3725,7 @@ func TestAgentConnectAuthorize_noTarget(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ConnectAuthorizeRequest{} req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() @@ -3653,6 +3743,7 @@ func TestAgentConnectAuthorize_idInvalidFormat(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ConnectAuthorizeRequest{ Target: "web", ClientCertURI: "tubes", @@ -3676,6 +3767,7 @@ func TestAgentConnectAuthorize_idNotService(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ConnectAuthorizeRequest{ Target: "web", ClientCertURI: "spiffe://1234.consul", @@ -3699,6 +3791,7 @@ func TestAgentConnectAuthorize_allow(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") target := "db" // Create some intentions @@ -3795,6 +3888,7 @@ func TestAgentConnectAuthorize_deny(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") target := "db" // Create some intentions @@ -3838,6 +3932,7 @@ func TestAgentConnectAuthorize_denyTrustDomain(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") target := "db" // Create some intentions @@ -3880,6 +3975,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") target := "db" @@ -3959,6 +4055,7 @@ func TestAgentConnectAuthorize_serviceWrite(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Create an ACL var token string @@ -3996,6 +4093,7 @@ func TestAgentConnectAuthorize_defaultDeny(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), TestACLConfig()) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.ConnectAuthorizeRequest{ Target: "foo", @@ -4017,8 +4115,9 @@ func TestAgentConnectAuthorize_defaultAllow(t *testing.T) { t.Parallel() assert := assert.New(t) + dc1 := "dc1" a := NewTestAgent(t.Name(), ` - acl_datacenter = "dc1" + acl_datacenter = "`+dc1+`" acl_default_policy = "allow" acl_master_token = "root" acl_agent_token = "root" @@ -4026,6 +4125,7 @@ func TestAgentConnectAuthorize_defaultAllow(t *testing.T) { acl_enforce_version_8 = true `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, dc1) args := &structs.ConnectAuthorizeRequest{ Target: "foo", @@ -4036,6 +4136,7 @@ func TestAgentConnectAuthorize_defaultAllow(t *testing.T) { respRaw, err := a.srv.AgentConnectAuthorize(resp, req) assert.Nil(err) assert.Equal(200, resp.Code) + assert.NotNil(respRaw) obj := respRaw.(*connectAuthorizeResp) assert.True(obj.Authorized) diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index f97b22dbcf..68c7dabd32 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/serf/coordinate" "github.com/stretchr/testify/assert" @@ -92,6 +93,7 @@ func TestCatalogNodes(t *testing.T) { if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1", nil) resp := httptest.NewRecorder() @@ -105,7 +107,7 @@ func TestCatalogNodes(t *testing.T) { nodes := obj.(structs.Nodes) if len(nodes) != 2 { - t.Fatalf("bad: %v", obj) + t.Fatalf("bad: %v ; nodes:=%v", obj, nodes) } } @@ -170,6 +172,8 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { if _, err := a2.JoinWAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } + testrpc.WaitForLeader(t, a1.RPC, "dc1") + testrpc.WaitForLeader(t, a2.RPC, "dc2") retry.Run(t, func(r *retry.R) { if got, want := len(a1.WANMembers()), 2; got < want { r.Fatalf("got %d WAN members want at least %d", got, want) @@ -208,7 +212,7 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { // Expect that DC1 gives us a WAN address (since the node is in DC2). nodes1 := obj1.(structs.Nodes) if len(nodes1) != 2 { - t.Fatalf("bad: %v", obj1) + t.Fatalf("bad: %v, nodes:=%v", obj1, nodes1) } var address string for _, node := range nodes1 { @@ -264,6 +268,7 @@ func TestCatalogNodes_Blocking(t *testing.T) { // an error channel instead. errch := make(chan error, 2) go func() { + testrpc.WaitForLeader(t, a.RPC, "dc1") start := time.Now() // register a service after the blocking call @@ -326,6 +331,7 @@ func TestCatalogNodes_DistanceSort(t *testing.T) { if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } + testrpc.WaitForLeader(t, a.RPC, "dc1") args = &structs.RegisterRequest{ Datacenter: "dc1", diff --git a/agent/consul/acl_replication_test.go b/agent/consul/acl_replication_test.go index 0ffc8d00e7..34ad3015f9 100644 --- a/agent/consul/acl_replication_test.go +++ b/agent/consul/acl_replication_test.go @@ -292,6 +292,9 @@ func TestACLReplication_IsACLReplicationEnabled(t *testing.T) { }) defer os.RemoveAll(dir2) defer s2.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s2.RPC, "dc2") + if s2.IsACLReplicationEnabled() { t.Fatalf("should not be enabled") } @@ -304,6 +307,7 @@ func TestACLReplication_IsACLReplicationEnabled(t *testing.T) { }) defer os.RemoveAll(dir3) defer s3.Shutdown() + testrpc.WaitForLeader(t, s3.RPC, "dc2") if !s3.IsACLReplicationEnabled() { t.Fatalf("should be enabled") } @@ -317,6 +321,7 @@ func TestACLReplication_IsACLReplicationEnabled(t *testing.T) { }) defer os.RemoveAll(dir4) defer s4.Shutdown() + testrpc.WaitForLeader(t, s4.RPC, "dc1") if s4.IsACLReplicationEnabled() { t.Fatalf("should not be enabled") } @@ -330,6 +335,7 @@ func TestACLReplication(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") client := rpcClient(t, s1) defer client.Close() @@ -340,6 +346,7 @@ func TestACLReplication(t *testing.T) { c.ACLReplicationInterval = 10 * time.Millisecond c.ACLReplicationApplyLimit = 1000000 }) + testrpc.WaitForLeader(t, s2.RPC, "dc2") s2.tokens.UpdateACLReplicationToken("root") defer os.RemoveAll(dir2) defer s2.Shutdown() diff --git a/agent/consul/acl_test.go b/agent/consul/acl_test.go index 45048f80a1..fdd8fc8a01 100644 --- a/agent/consul/acl_test.go +++ b/agent/consul/acl_test.go @@ -49,6 +49,7 @@ func TestACL_ResolveRootACL(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") rule, err := s1.resolveToken("allow") if !acl.IsErrRootDenied(err) { @@ -74,6 +75,7 @@ func TestACL_Authority_NotFound(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") client := rpcClient(t, s1) defer client.Close() @@ -96,6 +98,7 @@ func TestACL_Authority_Found(t *testing.T) { }) defer os.RemoveAll(dir1) defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") client := rpcClient(t, s1) defer client.Close() diff --git a/agent/consul/autopilot_test.go b/agent/consul/autopilot_test.go index ea88dcca32..084b86fda6 100644 --- a/agent/consul/autopilot_test.go +++ b/agent/consul/autopilot_test.go @@ -33,8 +33,9 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) { } func testCleanupDeadServer(t *testing.T, raftVersion int) { + dc := "dc1" conf := func(c *Config) { - c.Datacenter = "dc1" + c.Datacenter = dc c.Bootstrap = false c.BootstrapExpect = 3 c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion) @@ -58,6 +59,7 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) { joinLAN(t, s3, s1) for _, s := range servers { + testrpc.WaitForLeader(t, s.RPC, dc) retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) } diff --git a/agent/consul/session_endpoint_test.go b/agent/consul/session_endpoint_test.go index d557b31849..a11699abce 100644 --- a/agent/consul/session_endpoint_test.go +++ b/agent/consul/session_endpoint_test.go @@ -539,8 +539,9 @@ func TestSession_ApplyTimers(t *testing.T) { } func TestSession_Renew(t *testing.T) { - t.Parallel() - ttl := time.Second + // This method is timing sensitive, disable Parallel + //t.Parallel() + ttl := 1 * time.Second TTL := ttl.String() dir1, s1 := testServerWithConfig(t, func(c *Config) { diff --git a/agent/dns_test.go b/agent/dns_test.go index 043fa87b03..e0132a554f 100644 --- a/agent/dns_test.go +++ b/agent/dns_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/serf/coordinate" "github.com/miekg/dns" @@ -143,6 +144,7 @@ func TestDNS_Over_TCP(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -175,6 +177,7 @@ func TestDNS_NodeLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -269,6 +272,7 @@ func TestDNS_CaseInsensitiveNodeLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -300,6 +304,7 @@ func TestDNS_NodeLookup_PeriodName(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node with period in name args := &structs.RegisterRequest{ @@ -339,6 +344,7 @@ func TestDNS_NodeLookup_AAAA(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -455,6 +461,7 @@ func TestDNS_NodeLookup_CNAME(t *testing.T) { recursors = ["`+recursor.Addr+`"] `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -501,6 +508,7 @@ func TestDNS_NodeLookup_CNAME(t *testing.T) { func TestDNS_NodeLookup_TXT(t *testing.T) { a := NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -546,6 +554,7 @@ func TestDNS_NodeLookup_TXT(t *testing.T) { func TestDNS_NodeLookup_TXT_DontSuppress(t *testing.T) { a := NewTestAgent(t.Name(), `dns_config = { enable_additional_node_meta_txt = false }`) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -591,6 +600,7 @@ func TestDNS_NodeLookup_TXT_DontSuppress(t *testing.T) { func TestDNS_NodeLookup_ANY(t *testing.T) { a := NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -631,6 +641,7 @@ func TestDNS_NodeLookup_ANY(t *testing.T) { func TestDNS_NodeLookup_ANY_DontSuppressTXT(t *testing.T) { a := NewTestAgent(t.Name(), `dns_config = { enable_additional_node_meta_txt = false }`) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -671,6 +682,7 @@ func TestDNS_NodeLookup_ANY_DontSuppressTXT(t *testing.T) { func TestDNS_NodeLookup_A_SuppressTXT(t *testing.T) { a := NewTestAgent(t.Name(), `dns_config = { enable_additional_node_meta_txt = false }`) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -707,6 +719,7 @@ func TestDNS_EDNS0(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -746,6 +759,7 @@ func TestDNS_ReverseLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -787,6 +801,7 @@ func TestDNS_ReverseLookup_CustomDomain(t *testing.T) { domain = "custom" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -826,6 +841,7 @@ func TestDNS_ReverseLookup_IPV6(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -865,6 +881,7 @@ func TestDNS_ServiceReverseLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -912,6 +929,7 @@ func TestDNS_ServiceReverseLookup_IPV6(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -961,6 +979,7 @@ func TestDNS_ServiceReverseLookup_CustomDomain(t *testing.T) { domain = "custom" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -1008,6 +1027,7 @@ func TestDNS_ServiceReverseLookupNodeAddress(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -1055,6 +1075,7 @@ func TestDNS_ServiceLookupNoMultiCNAME(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -1109,6 +1130,7 @@ func TestDNS_ServiceLookupPreferNoCNAME(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -1166,6 +1188,7 @@ func TestDNS_ServiceLookupMultiAddrNoCNAME(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -1239,6 +1262,7 @@ func TestDNS_ServiceLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -1361,6 +1385,7 @@ func TestDNS_ServiceLookupWithInternalServiceAddress(t *testing.T) { node_name = "my.test-node" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. // The service is using the consul DNS name as service address @@ -1421,6 +1446,7 @@ func TestDNS_ConnectServiceLookup(t *testing.T) { assert := assert.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register { @@ -1464,6 +1490,7 @@ func TestDNS_ExternalServiceLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with an external service. { @@ -1538,6 +1565,7 @@ func TestDNS_ExternalServiceToConsulCNAMELookup(t *testing.T) { node_name = "test node" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the initial node with a service { @@ -1650,6 +1678,7 @@ func TestDNS_NSRecords(t *testing.T) { node_name = "server1" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := new(dns.Msg) m.SetQuestion("something.node.consul.", dns.TypeNS) @@ -1685,6 +1714,7 @@ func TestDNS_NSRecords_IPV6(t *testing.T) { advertise_addr = "::1" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := new(dns.Msg) m.SetQuestion("server1.node.dc1.consul.", dns.TypeNS) @@ -1719,6 +1749,7 @@ func TestDNS_ExternalServiceToConsulCNAMENestedLookup(t *testing.T) { node_name = "test-node" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the initial node with a service { @@ -1858,6 +1889,7 @@ func TestDNS_ServiceLookup_ServiceAddress_A(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -1950,6 +1982,7 @@ func TestDNS_ServiceLookup_ServiceAddress_CNAME(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service whose address isn't an IP. { @@ -2042,6 +2075,7 @@ func TestDNS_ServiceLookup_ServiceAddressIPV6(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -2322,6 +2356,7 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -2392,6 +2427,7 @@ func TestDNS_ServiceLookup_TagPeriod(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -2461,6 +2497,7 @@ func TestDNS_PreparedQueryNearIPEDNS(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") added := 0 @@ -2587,6 +2624,7 @@ func TestDNS_PreparedQueryNearIP(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") added := 0 @@ -2691,6 +2729,7 @@ func TestDNS_ServiceLookup_PreparedQueryNamePeriod(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -2769,6 +2808,7 @@ func TestDNS_ServiceLookup_Dedup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a single node with multiple instances of a service. { @@ -2871,6 +2911,7 @@ func TestDNS_ServiceLookup_Dedup_SRV(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a single node with multiple instances of a service. { @@ -3008,6 +3049,7 @@ func TestDNS_Recurse(t *testing.T) { recursors = ["`+recursor.Addr+`"] `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := new(dns.Msg) m.SetQuestion("apple.com.", dns.TypeANY) @@ -3039,6 +3081,7 @@ func TestDNS_Recurse_Truncation(t *testing.T) { recursors = ["`+recursor.Addr+`"] `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := new(dns.Msg) m.SetQuestion("apple.com.", dns.TypeANY) @@ -3082,6 +3125,7 @@ func TestDNS_RecursorTimeout(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := new(dns.Msg) m.SetQuestion("apple.com.", dns.TypeANY) @@ -3115,6 +3159,7 @@ func TestDNS_ServiceLookup_FilterCritical(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register nodes with health checks in various states. { @@ -3270,6 +3315,7 @@ func TestDNS_ServiceLookup_OnlyFailing(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register nodes with all health checks in a critical state. { @@ -3386,6 +3432,7 @@ func TestDNS_ServiceLookup_OnlyPassing(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register nodes with health checks in various states. { @@ -3506,6 +3553,7 @@ func TestDNS_ServiceLookup_Randomize(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a large number of nodes. for i := 0; i < generateNumNodes; i++ { @@ -3640,6 +3688,7 @@ func TestDNS_TCP_and_UDP_Truncate(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") services := []string{"normal", "truncated"} for index, service := range services { @@ -3737,6 +3786,7 @@ func TestDNS_ServiceLookup_Truncate(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a large number of nodes. for i := 0; i < generateNumNodes; i++ { @@ -3805,6 +3855,7 @@ func TestDNS_ServiceLookup_LargeResponses(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") longServiceName := "this-is-a-very-very-very-very-very-long-name-for-a-service" @@ -3907,6 +3958,7 @@ func testDNSServiceLookupResponseLimits(t *testing.T, answerLimit int, qType uin } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") for i := 0; i < generateNumNodes; i++ { nodeAddress := fmt.Sprintf("127.0.0.%d", i+1) @@ -3996,6 +4048,7 @@ func checkDNSService(t *testing.T, generateNumNodes int, aRecordLimit int, qType } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") for i := 0; i < generateNumNodes; i++ { nodeAddress := fmt.Sprintf("127.0.0.%d", i+1) @@ -4223,6 +4276,7 @@ func TestDNS_ServiceLookup_CNAME(t *testing.T) { recursors = ["`+recursor.Addr+`"] `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a name for an address. { @@ -4324,6 +4378,7 @@ func TestDNS_NodeLookup_TTL(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -4544,6 +4599,7 @@ func TestDNS_PreparedQuery_TTL(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node and a service. { @@ -4836,6 +4892,7 @@ func TestDNS_ServiceLookup_SRV_RFC(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -4910,6 +4967,7 @@ func TestDNS_ServiceLookup_SRV_RFC_TCP_Default(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ @@ -4999,6 +5057,7 @@ func TestDNS_ServiceLookup_FilterACL(t *testing.T) { acl_default_policy = "deny" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a service args := &structs.RegisterRequest{ @@ -5035,6 +5094,7 @@ func TestDNS_ServiceLookup_FilterACL(t *testing.T) { func TestDNS_ServiceLookup_MetaTXT(t *testing.T) { a := NewTestAgent(t.Name(), `dns_config = { enable_additional_node_meta_txt = true }`) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -5080,6 +5140,7 @@ func TestDNS_ServiceLookup_MetaTXT(t *testing.T) { func TestDNS_ServiceLookup_SuppressTXT(t *testing.T) { a := NewTestAgent(t.Name(), `dns_config = { enable_additional_node_meta_txt = false }`) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. args := &structs.RegisterRequest{ @@ -5123,6 +5184,7 @@ func TestDNS_AddressLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Look up the addresses cases := map[string]string{ @@ -5159,6 +5221,7 @@ func TestDNS_AddressLookupIPV6(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Look up the addresses cases := map[string]string{ @@ -5196,6 +5259,7 @@ func TestDNS_NonExistingLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // lookup a non-existing node, we should receive a SOA m := new(dns.Msg) @@ -5224,6 +5288,7 @@ func TestDNS_NonExistingLookupEmptyAorAAAA(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a v6-only service and a v4-only service. { @@ -5367,6 +5432,7 @@ func TestDNS_PreparedQuery_AllowStale(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockPreparedQuery{ executeFn: func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -5411,6 +5477,7 @@ func TestDNS_InvalidQueries(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Try invalid forms of queries that should hit the special invalid case // of our query parser. @@ -5452,6 +5519,7 @@ func TestDNS_PreparedQuery_AgentSource(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := MockPreparedQuery{ executeFn: func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { @@ -5957,6 +6025,7 @@ func TestDNS_Compression_Query(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register a node with a service. { @@ -6043,6 +6112,7 @@ func TestDNS_Compression_ReverseLookup(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node. args := &structs.RegisterRequest{ @@ -6101,6 +6171,7 @@ func TestDNS_Compression_Recurse(t *testing.T) { recursors = ["`+recursor.Addr+`"] `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") m := new(dns.Msg) m.SetQuestion("apple.com.", dns.TypeANY) diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 8164be4771..fe58237640 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/serf/coordinate" "github.com/stretchr/testify/assert" @@ -268,6 +269,7 @@ func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil) resp := httptest.NewRecorder() @@ -399,6 +401,7 @@ func TestHealthServiceNodes(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1", nil) resp := httptest.NewRecorder() @@ -465,6 +468,7 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil) resp := httptest.NewRecorder() @@ -517,10 +521,10 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() - + dc := "dc1" // Create a service check args := &structs.RegisterRequest{ - Datacenter: "dc1", + Datacenter: dc, Node: "bar", Address: "127.0.0.1", Service: &structs.NodeService{ @@ -533,7 +537,7 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) { ServiceID: "test", }, } - + testrpc.WaitForLeader(t, a.RPC, dc) var out struct{} if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -597,9 +601,10 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + dc := "dc1" // Create a failing service check args := &structs.RegisterRequest{ - Datacenter: "dc1", + Datacenter: dc, Node: a.Config.NodeName, Address: "127.0.0.1", Check: &structs.HealthCheck{ @@ -610,6 +615,7 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) { }, } + testrpc.WaitForLeader(t, a.RPC, dc) var out struct{} if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) @@ -694,6 +700,7 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) { acl_datacenter = "" `) defer a1.Shutdown() + testrpc.WaitForLeader(t, a1.RPC, "dc1") a2 := NewTestAgent(t.Name(), ` datacenter = "dc2" @@ -701,6 +708,7 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) { acl_datacenter = "" `) defer a2.Shutdown() + testrpc.WaitForLeader(t, a2.RPC, "dc2") // Wait for the WAN join. addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 3cdf2a28a1..92f1761429 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/agent" @@ -29,6 +31,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register info args := &structs.RegisterRequest{ @@ -237,6 +240,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register node info var out struct{} @@ -374,6 +378,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -485,6 +490,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") { // Single check @@ -619,6 +625,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { acl_enforce_version_8 = true`} a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Create the ACL arg := structs.ACLRequest{ @@ -758,6 +765,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register info args := &structs.RegisterRequest{ Datacenter: "dc1", @@ -947,17 +955,20 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { t.Parallel() + dc := "dc1" a := &agent.TestAgent{Name: t.Name(), HCL: ` - acl_datacenter = "dc1" + acl_datacenter = "` + dc + `" acl_master_token = "root" acl_default_policy = "deny" acl_enforce_version_8 = true`} a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, dc) + // Create the ACL arg := structs.ACLRequest{ - Datacenter: "dc1", + Datacenter: dc, Op: structs.ACLSet, ACL: structs.ACL{ Name: "User token", @@ -996,7 +1007,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { // Verify that we are in sync { req := structs.NodeSpecificRequest{ - Datacenter: "dc1", + Datacenter: dc, Node: a.Config.NodeName, QueryOptions: structs.QueryOptions{ Token: "root", @@ -1066,7 +1077,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { // Verify that we are in sync req := structs.NodeSpecificRequest{ - Datacenter: "dc1", + Datacenter: dc, Node: a.Config.NodeName, QueryOptions: structs.QueryOptions{ Token: "root", @@ -1112,7 +1123,7 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { // Verify that we are in sync { req := structs.NodeSpecificRequest{ - Datacenter: "dc1", + Datacenter: dc, Node: a.Config.NodeName, QueryOptions: structs.QueryOptions{ Token: "root", @@ -1161,6 +1172,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { check_update_interval = "0s" # set to "0s" since otherwise output checks are deferred `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") inSync := func(id string) bool { s := a.State.CheckState(types.CheckID(id)) @@ -1211,6 +1223,7 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { `} a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Create a check check := &structs.HealthCheck{ @@ -1401,6 +1414,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { }`} a.Start() defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Register info args := &structs.RegisterRequest{ @@ -1670,6 +1684,7 @@ func TestAgent_sendCoordinate(t *testing.T) { } `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Logf("%d %d %s", a.Config.ConsulCoordinateUpdateBatchSize, diff --git a/agent/proxy/daemon_test.go b/agent/proxy/daemon_test.go index b2b9f5dfdc..9ea552299a 100644 --- a/agent/proxy/daemon_test.go +++ b/agent/proxy/daemon_test.go @@ -431,7 +431,7 @@ func TestDaemonRestart_pidFile(t *testing.T) { defer d.Stop() // Wait for the file to exist. We save the func so we can reuse the test. - waitFile := func() { + waitFile := func(path string) { retry.Run(t, func(r *retry.R) { _, err := os.Stat(path) if err == nil { @@ -440,7 +440,8 @@ func TestDaemonRestart_pidFile(t *testing.T) { r.Fatalf("error waiting for path: %s", err) }) } - waitFile() + waitFile(path) + waitFile(pidPath) // Check the pid file pidRaw, err := ioutil.ReadFile(pidPath) @@ -451,7 +452,8 @@ func TestDaemonRestart_pidFile(t *testing.T) { require.NoError(os.Remove(path)) // File should re-appear because the process is restart - waitFile() + waitFile(path) + waitFile(pidPath) // Check the pid file and it should not equal pidRaw2, err := ioutil.ReadFile(pidPath) diff --git a/agent/remote_exec_test.go b/agent/remote_exec_test.go index 7a164f130e..d957d3b147 100644 --- a/agent/remote_exec_test.go +++ b/agent/remote_exec_test.go @@ -11,6 +11,8 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/go-uuid" ) @@ -96,42 +98,47 @@ func TestRexecWriter(t *testing.T) { func TestRemoteExecGetSpec(t *testing.T) { t.Parallel() - testRemoteExecGetSpec(t, "", "", true) + testRemoteExecGetSpec(t, "", "", true, "") } func TestRemoteExecGetSpec_ACLToken(t *testing.T) { t.Parallel() + dc := "dc1" testRemoteExecGetSpec(t, ` - acl_datacenter = "dc1" + acl_datacenter = "`+dc+`" acl_master_token = "root" acl_token = "root" acl_default_policy = "deny" - `, "root", true) + `, "root", true, dc) } func TestRemoteExecGetSpec_ACLAgentToken(t *testing.T) { t.Parallel() + dc := "dc1" testRemoteExecGetSpec(t, ` - acl_datacenter = "dc1" + acl_datacenter = "`+dc+`" acl_master_token = "root" acl_agent_token = "root" acl_default_policy = "deny" - `, "root", true) + `, "root", true, dc) } func TestRemoteExecGetSpec_ACLDeny(t *testing.T) { t.Parallel() + dc := "dc1" testRemoteExecGetSpec(t, ` - acl_datacenter = "dc1" + acl_datacenter = "`+dc+`" acl_master_token = "root" acl_default_policy = "deny" - `, "root", false) + `, "root", false, dc) } -func testRemoteExecGetSpec(t *testing.T, hcl string, token string, shouldSucceed bool) { +func testRemoteExecGetSpec(t *testing.T, hcl string, token string, shouldSucceed bool, dc string) { a := NewTestAgent(t.Name(), hcl) defer a.Shutdown() - + if dc != "" { + testrpc.WaitForLeader(t, a.RPC, dc) + } event := &remoteExecEvent{ Prefix: "_rexec", Session: makeRexecSession(t, a.Agent, token), @@ -161,42 +168,50 @@ func testRemoteExecGetSpec(t *testing.T, hcl string, token string, shouldSucceed func TestRemoteExecWrites(t *testing.T) { t.Parallel() - testRemoteExecWrites(t, "", "", true) + testRemoteExecWrites(t, "", "", true, "") } func TestRemoteExecWrites_ACLToken(t *testing.T) { t.Parallel() + dc := "dc1" testRemoteExecWrites(t, ` - acl_datacenter = "dc1" + acl_datacenter = "`+dc+`" acl_master_token = "root" acl_token = "root" acl_default_policy = "deny" - `, "root", true) + `, "root", true, dc) } func TestRemoteExecWrites_ACLAgentToken(t *testing.T) { t.Parallel() + dc := "dc1" testRemoteExecWrites(t, ` - acl_datacenter = "dc1" + acl_datacenter = "`+dc+`" acl_master_token = "root" acl_agent_token = "root" acl_default_policy = "deny" - `, "root", true) + `, "root", true, dc) } func TestRemoteExecWrites_ACLDeny(t *testing.T) { t.Parallel() + dc := "dc1" testRemoteExecWrites(t, ` - acl_datacenter = "dc1" + acl_datacenter = "`+dc+`" acl_master_token = "root" acl_default_policy = "deny" - `, "root", false) + `, "root", false, dc) } -func testRemoteExecWrites(t *testing.T, hcl string, token string, shouldSucceed bool) { +func testRemoteExecWrites(t *testing.T, hcl string, token string, shouldSucceed bool, dc string) { a := NewTestAgent(t.Name(), hcl) defer a.Shutdown() - + if dc != "" { + testrpc.WaitForLeader(t, a.RPC, dc) + } else { + // For slow machines, ensure we wait a bit + time.Sleep(1 * time.Millisecond) + } event := &remoteExecEvent{ Prefix: "_rexec", Session: makeRexecSession(t, a.Agent, token), @@ -253,57 +268,58 @@ func testRemoteExecWrites(t *testing.T, hcl string, token string, shouldSucceed func testHandleRemoteExec(t *testing.T, command string, expectedSubstring string, expectedReturnCode string) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + retry.Run(t, func(r *retry.R) { + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, a.Agent, ""), + } + defer destroySession(t, a.Agent, event.Session, "") - event := &remoteExecEvent{ - Prefix: "_rexec", - Session: makeRexecSession(t, a.Agent, ""), - } - defer destroySession(t, a.Agent, event.Session, "") + spec := &remoteExecSpec{ + Command: command, + Wait: time.Second, + } + buf, err := json.Marshal(spec) + if err != nil { + t.Fatalf("err: %v", err) + } + key := "_rexec/" + event.Session + "/job" + setKV(t, a.Agent, key, buf, "") - spec := &remoteExecSpec{ - Command: command, - Wait: time.Second, - } - buf, err := json.Marshal(spec) - if err != nil { - t.Fatalf("err: %v", err) - } - key := "_rexec/" + event.Session + "/job" - setKV(t, a.Agent, key, buf, "") + buf, err = json.Marshal(event) + if err != nil { + t.Fatalf("err: %v", err) + } + msg := &UserEvent{ + ID: generateUUID(), + Payload: buf, + } - buf, err = json.Marshal(event) - if err != nil { - t.Fatalf("err: %v", err) - } - msg := &UserEvent{ - ID: generateUUID(), - Payload: buf, - } + // Handle the event... + a.handleRemoteExec(msg) - // Handle the event... - a.handleRemoteExec(msg) + // Verify we have an ack + key = "_rexec/" + event.Session + "/" + a.Config.NodeName + "/ack" + d := getKV(t, a.Agent, key, "") + if d == nil || d.Session != event.Session { + t.Fatalf("bad ack: %#v", d) + } - // Verify we have an ack - key = "_rexec/" + event.Session + "/" + a.Config.NodeName + "/ack" - d := getKV(t, a.Agent, key, "") - if d == nil || d.Session != event.Session { - t.Fatalf("bad ack: %#v", d) - } + // Verify we have output + key = "_rexec/" + event.Session + "/" + a.Config.NodeName + "/out/00000" + d = getKV(t, a.Agent, key, "") + if d == nil || d.Session != event.Session || + !bytes.Contains(d.Value, []byte(expectedSubstring)) { + t.Fatalf("bad output: %#v", d) + } - // Verify we have output - key = "_rexec/" + event.Session + "/" + a.Config.NodeName + "/out/00000" - d = getKV(t, a.Agent, key, "") - if d == nil || d.Session != event.Session || - !bytes.Contains(d.Value, []byte(expectedSubstring)) { - t.Fatalf("bad output: %#v", d) - } - - // Verify we have an exit code - key = "_rexec/" + event.Session + "/" + a.Config.NodeName + "/exit" - d = getKV(t, a.Agent, key, "") - if d == nil || d.Session != event.Session || string(d.Value) != expectedReturnCode { - t.Fatalf("bad output: %#v", d) - } + // Verify we have an exit code + key = "_rexec/" + event.Session + "/" + a.Config.NodeName + "/exit" + d = getKV(t, a.Agent, key, "") + if d == nil || d.Session != event.Session || string(d.Value) != expectedReturnCode { + t.Fatalf("bad output: %#v", d) + } + }) } func TestHandleRemoteExec(t *testing.T) { diff --git a/agent/session_endpoint_test.go b/agent/session_endpoint_test.go index 8d4e358f3f..a66f6c42bd 100644 --- a/agent/session_endpoint_test.go +++ b/agent/session_endpoint_test.go @@ -10,6 +10,8 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/types" "github.com/pascaldekloe/goe/verify" ) @@ -41,6 +43,7 @@ func TestSessionCreate(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Create a health check args := &structs.RegisterRequest{ @@ -93,6 +96,7 @@ func TestSessionCreate_Delete(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Create a health check args := &structs.RegisterRequest{ @@ -107,45 +111,48 @@ func TestSessionCreate_Delete(t *testing.T) { Status: api.HealthPassing, }, } - var out struct{} - if err := a.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) - } + retry.Run(t, func(r *retry.R) { + var out struct{} + if err := a.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } - // Associate session with node and 2 health checks, and make it delete on session destroy - body := bytes.NewBuffer(nil) - enc := json.NewEncoder(body) - raw := map[string]interface{}{ - "Name": "my-cool-session", - "Node": a.Config.NodeName, - "Checks": []types.CheckID{structs.SerfCheckID, "consul"}, - "LockDelay": "20s", - "Behavior": structs.SessionKeysDelete, - } - enc.Encode(raw) + // Associate session with node and 2 health checks, and make it delete on session destroy + body := bytes.NewBuffer(nil) + enc := json.NewEncoder(body) + raw := map[string]interface{}{ + "Name": "my-cool-session", + "Node": a.Config.NodeName, + "Checks": []types.CheckID{structs.SerfCheckID, "consul"}, + "LockDelay": "20s", + "Behavior": structs.SessionKeysDelete, + } + enc.Encode(raw) - req, _ := http.NewRequest("PUT", "/v1/session/create", body) - resp := httptest.NewRecorder() - obj, err := a.srv.SessionCreate(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } + req, _ := http.NewRequest("PUT", "/v1/session/create", body) + resp := httptest.NewRecorder() + obj, err := a.srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } - want := structs.Session{ - ID: obj.(sessionCreateResponse).ID, - Name: "my-cool-session", - Node: a.Config.NodeName, - Checks: []types.CheckID{structs.SerfCheckID, "consul"}, - LockDelay: 20 * time.Second, - Behavior: structs.SessionKeysDelete, - } - verifySession(t, a, want) + want := structs.Session{ + ID: obj.(sessionCreateResponse).ID, + Name: "my-cool-session", + Node: a.Config.NodeName, + Checks: []types.CheckID{structs.SerfCheckID, "consul"}, + LockDelay: 20 * time.Second, + Behavior: structs.SessionKeysDelete, + } + verifySession(t, a, want) + }) } func TestSessionCreate_DefaultCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Associate session with node and 2 health checks body := bytes.NewBuffer(nil) @@ -159,26 +166,29 @@ func TestSessionCreate_DefaultCheck(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/session/create", body) resp := httptest.NewRecorder() - obj, err := a.srv.SessionCreate(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } + retry.Run(t, func(r *retry.R) { + obj, err := a.srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } - want := structs.Session{ - ID: obj.(sessionCreateResponse).ID, - Name: "my-cool-session", - Node: a.Config.NodeName, - Checks: []types.CheckID{structs.SerfCheckID}, - LockDelay: 20 * time.Second, - Behavior: structs.SessionKeysRelease, - } - verifySession(t, a, want) + want := structs.Session{ + ID: obj.(sessionCreateResponse).ID, + Name: "my-cool-session", + Node: a.Config.NodeName, + Checks: []types.CheckID{structs.SerfCheckID}, + LockDelay: 20 * time.Second, + Behavior: structs.SessionKeysRelease, + } + verifySession(t, a, want) + }) } func TestSessionCreate_NoCheck(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") // Associate session with node and 2 health checks body := bytes.NewBuffer(nil) @@ -193,20 +203,22 @@ func TestSessionCreate_NoCheck(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/session/create", body) resp := httptest.NewRecorder() - obj, err := a.srv.SessionCreate(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } + retry.Run(t, func(r *retry.R) { + obj, err := a.srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } - want := structs.Session{ - ID: obj.(sessionCreateResponse).ID, - Name: "my-cool-session", - Node: a.Config.NodeName, - Checks: []types.CheckID{}, - LockDelay: 20 * time.Second, - Behavior: structs.SessionKeysRelease, - } - verifySession(t, a, want) + want := structs.Session{ + ID: obj.(sessionCreateResponse).ID, + Name: "my-cool-session", + Node: a.Config.NodeName, + Checks: []types.CheckID{}, + LockDelay: 20 * time.Second, + Behavior: structs.SessionKeysRelease, + } + verifySession(t, a, want) + }) } func TestFixupLockDelay(t *testing.T) { @@ -295,6 +307,7 @@ func TestSessionDestroy(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestSession(t, a.srv) @@ -316,38 +329,40 @@ func TestSessionCustomTTL(t *testing.T) { session_ttl_min = "250ms" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + retry.Run(t, func(r *retry.R) { + id := makeTestSessionTTL(t, a.srv, ttl.String()) - id := makeTestSessionTTL(t, a.srv, ttl.String()) + req, _ := http.NewRequest("GET", "/v1/session/info/"+id, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok := obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if len(respObj) != 1 { + t.Fatalf("bad: %v", respObj) + } + if respObj[0].TTL != ttl.String() { + t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) + } - req, _ := http.NewRequest("GET", "/v1/session/info/"+id, nil) - resp := httptest.NewRecorder() - obj, err := a.srv.SessionGet(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } - respObj, ok := obj.(structs.Sessions) - if !ok { - t.Fatalf("should work") - } - if len(respObj) != 1 { - t.Fatalf("bad: %v", respObj) - } - if respObj[0].TTL != ttl.String() { - t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) - } + time.Sleep(ttl*structs.SessionTTLMultiplier + ttl) - time.Sleep(ttl*structs.SessionTTLMultiplier + ttl) - - req, _ = http.NewRequest("GET", "/v1/session/info/"+id, nil) - resp = httptest.NewRecorder() - obj, err = a.srv.SessionGet(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } - respObj, ok = obj.(structs.Sessions) - if len(respObj) != 0 { - t.Fatalf("session '%s' should have been destroyed", id) - } + req, _ = http.NewRequest("GET", "/v1/session/info/"+id, nil) + resp = httptest.NewRecorder() + obj, err = a.srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok = obj.(structs.Sessions) + if len(respObj) != 0 { + t.Fatalf("session '%s' should have been destroyed", id) + } + }) } func TestSessionTTLRenew(t *testing.T) { @@ -357,6 +372,7 @@ func TestSessionTTLRenew(t *testing.T) { session_ttl_min = "250ms" `) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestSessionTTL(t, a.srv, ttl.String()) @@ -434,25 +450,29 @@ func TestSessionGet(t *testing.T) { t.Run("", func(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/session/info/adf4238a-882b-9ddc-4a9d-5b6758e4159e", nil) resp := httptest.NewRecorder() - obj, err := a.srv.SessionGet(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } - respObj, ok := obj.(structs.Sessions) - if !ok { - t.Fatalf("should work") - } - if respObj == nil || len(respObj) != 0 { - t.Fatalf("bad: %v", respObj) - } + retry.Run(t, func(r *retry.R) { + obj, err := a.srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok := obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if respObj == nil || len(respObj) != 0 { + t.Fatalf("bad: %v", respObj) + } + }) }) t.Run("", func(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestSession(t, a.srv) @@ -477,6 +497,7 @@ func TestSessionList(t *testing.T) { t.Run("", func(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/session/list", nil) resp := httptest.NewRecorder() @@ -496,6 +517,7 @@ func TestSessionList(t *testing.T) { t.Run("", func(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") var ids []string for i := 0; i < 10; i++ { @@ -523,6 +545,7 @@ func TestSessionsForNode(t *testing.T) { t.Run("", func(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") req, _ := http.NewRequest("GET", "/v1/session/node/"+a.Config.NodeName, nil) resp := httptest.NewRecorder() @@ -542,6 +565,7 @@ func TestSessionsForNode(t *testing.T) { t.Run("", func(t *testing.T) { a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") var ids []string for i := 0; i < 10; i++ { @@ -568,6 +592,7 @@ func TestSessionDeleteDestroy(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") id := makeTestSessionDelete(t, a.srv) diff --git a/api/catalog_test.go b/api/catalog_test.go index 3cc1bbccb1..e384a001c2 100644 --- a/api/catalog_test.go +++ b/api/catalog_test.go @@ -36,8 +36,8 @@ func TestAPI_CatalogNodes(t *testing.T) { if err != nil { r.Fatal(err) } - if meta.LastIndex == 0 { - r.Fatal("got last index 0 want > 0") + if meta.LastIndex < 2 { + r.Fatal("Last index must be greater than 1") } want := []*Node{ { diff --git a/api/lock_test.go b/api/lock_test.go index 81b47e934f..476f6951d6 100644 --- a/api/lock_test.go +++ b/api/lock_test.go @@ -9,6 +9,8 @@ import ( "sync" "testing" "time" + + "github.com/hashicorp/consul/testutil/retry" ) func TestAPI_LockLockUnlock(t *testing.T) { @@ -74,34 +76,36 @@ func TestAPI_LockForceInvalidate(t *testing.T) { c, s := makeClient(t) defer s.Stop() - lock, err := c.LockKey("test/lock") - if err != nil { - t.Fatalf("err: %v", err) - } + retry.Run(t, func(r *retry.R) { + lock, err := c.LockKey("test/lock") + if err != nil { + t.Fatalf("err: %v", err) + } - // Should work - leaderCh, err := lock.Lock(nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if leaderCh == nil { - t.Fatalf("not leader") - } - defer lock.Unlock() + // Should work + leaderCh, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("not leader") + } + defer lock.Unlock() - go func() { - // Nuke the session, simulator an operator invalidation - // or a health check failure - session := c.Session() - session.Destroy(lock.lockSession, nil) - }() + go func() { + // Nuke the session, simulator an operator invalidation + // or a health check failure + session := c.Session() + session.Destroy(lock.lockSession, nil) + }() - // Should loose leadership - select { - case <-leaderCh: - case <-time.After(time.Second): - t.Fatalf("should not be leader") - } + // Should loose leadership + select { + case <-leaderCh: + case <-time.After(time.Second): + t.Fatalf("should not be leader") + } + }) } func TestAPI_LockDeleteKey(t *testing.T) { diff --git a/api/operator_autopilot_test.go b/api/operator_autopilot_test.go index 9b84b9b4e7..7b4a86b0dd 100644 --- a/api/operator_autopilot_test.go +++ b/api/operator_autopilot_test.go @@ -41,44 +41,46 @@ func TestAPI_OperatorAutopilotCASConfiguration(t *testing.T) { c, s := makeClient(t) defer s.Stop() - operator := c.Operator() - config, err := operator.AutopilotGetConfiguration(nil) - if err != nil { - t.Fatalf("err: %v", err) - } - if !config.CleanupDeadServers { - t.Fatalf("bad: %v", config) - } - - // Pass an invalid ModifyIndex - { - newConf := &AutopilotConfiguration{ - CleanupDeadServers: false, - ModifyIndex: config.ModifyIndex - 1, - } - resp, err := operator.AutopilotCASConfiguration(newConf, nil) + retry.Run(t, func(r *retry.R) { + operator := c.Operator() + config, err := operator.AutopilotGetConfiguration(nil) if err != nil { t.Fatalf("err: %v", err) } - if resp { - t.Fatalf("bad: %v", resp) + if !config.CleanupDeadServers { + t.Fatalf("bad: %v", config) } - } - // Pass a valid ModifyIndex - { - newConf := &AutopilotConfiguration{ - CleanupDeadServers: false, - ModifyIndex: config.ModifyIndex, + // Pass an invalid ModifyIndex + { + newConf := &AutopilotConfiguration{ + CleanupDeadServers: false, + ModifyIndex: config.ModifyIndex - 1, + } + resp, err := operator.AutopilotCASConfiguration(newConf, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp { + t.Fatalf("bad: %v", resp) + } } - resp, err := operator.AutopilotCASConfiguration(newConf, nil) - if err != nil { - t.Fatalf("err: %v", err) + + // Pass a valid ModifyIndex + { + newConf := &AutopilotConfiguration{ + CleanupDeadServers: false, + ModifyIndex: config.ModifyIndex, + } + resp, err := operator.AutopilotCASConfiguration(newConf, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !resp { + t.Fatalf("bad: %v", resp) + } } - if !resp { - t.Fatalf("bad: %v", resp) - } - } + }) } func TestAPI_OperatorAutopilotServerHealth(t *testing.T) { diff --git a/command/catalog/list/nodes/catalog_list_nodes_test.go b/command/catalog/list/nodes/catalog_list_nodes_test.go index 69ce5f893d..1d46a16a8b 100644 --- a/command/catalog/list/nodes/catalog_list_nodes_test.go +++ b/command/catalog/list/nodes/catalog_list_nodes_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/testrpc" "github.com/mitchellh/cli" ) @@ -34,6 +35,7 @@ func TestCatalogListNodesCommand(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") t.Run("simple", func(t *testing.T) { ui := cli.NewMockUi() c := New(ui) diff --git a/command/connect/ca/set/connect_ca_set_test.go b/command/connect/ca/set/connect_ca_set_test.go index 095d21d172..bd8abd61cd 100644 --- a/command/connect/ca/set/connect_ca_set_test.go +++ b/command/connect/ca/set/connect_ca_set_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" "github.com/mitchellh/cli" ) @@ -26,6 +27,7 @@ func TestConnectCASetConfigCommand(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") ui := cli.NewMockUi() c := New(ui) args := []string{ diff --git a/command/exec/exec_test.go b/command/exec/exec_test.go index b7d7179fe1..b379a89769 100644 --- a/command/exec/exec_test.go +++ b/command/exec/exec_test.go @@ -148,7 +148,6 @@ func TestExecCommand_Sessions(t *testing.T) { ui := cli.NewMockUi() c := New(ui, nil) c.apiclient = a.Client() - id, err := c.createSession() if err != nil { t.Fatalf("err: %v", err) @@ -235,9 +234,9 @@ func TestExecCommand_UploadDestroy(t *testing.T) { defer a.Shutdown() ui := cli.NewMockUi() + c := New(ui, nil) c.apiclient = a.Client() - id, err := c.createSession() if err != nil { t.Fatalf("err: %v", err) diff --git a/command/lock/lock_test.go b/command/lock/lock_test.go index 0856e65187..b04be5e875 100644 --- a/command/lock/lock_test.go +++ b/command/lock/lock_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testrpc" "github.com/mitchellh/cli" ) @@ -44,6 +45,8 @@ func TestLockCommand(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -67,6 +70,8 @@ func TestLockCommand_NoShell(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -90,6 +95,8 @@ func TestLockCommand_TryLock(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -122,6 +129,8 @@ func TestLockCommand_TrySemaphore(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -154,6 +163,8 @@ func TestLockCommand_MonitorRetry_Lock_Default(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -187,6 +198,8 @@ func TestLockCommand_MonitorRetry_Semaphore_Default(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -220,6 +233,8 @@ func TestLockCommand_MonitorRetry_Lock_Arg(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -253,6 +268,8 @@ func TestLockCommand_MonitorRetry_Semaphore_Arg(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + ui := cli.NewMockUi() c := New(ui) @@ -286,6 +303,8 @@ func TestLockCommand_ChildExitCode(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + t.Run("clean exit", func(t *testing.T) { ui := cli.NewMockUi() c := New(ui) diff --git a/testrpc/wait.go b/testrpc/wait.go index ba0c4afb42..becee5d7e1 100644 --- a/testrpc/wait.go +++ b/testrpc/wait.go @@ -20,8 +20,8 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string) { if !out.QueryMeta.KnownLeader { r.Fatalf("No leader") } - if out.Index == 0 { - r.Fatalf("Consul index is 0") + if out.Index < 2 { + r.Fatalf("Consul index should be at least 2") } }) }