add general runstep test helper instead of copying it all over the place (#13013)

This commit is contained in:
R.B. Boyer 2022-05-10 15:25:51 -05:00 committed by GitHub
parent 11b12885f3
commit 0d6d16ddfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 243 additions and 236 deletions

View File

@ -6205,13 +6205,6 @@ func TestAgentConnectCALeafCert_goodNotLocal(t *testing.T) {
func TestAgentConnectCALeafCert_nonBlockingQuery_after_blockingQuery_shouldNotBlock(t *testing.T) { func TestAgentConnectCALeafCert_nonBlockingQuery_after_blockingQuery_shouldNotBlock(t *testing.T) {
// see: https://github.com/hashicorp/consul/issues/12048 // see: https://github.com/hashicorp/consul/issues/12048
runStep := func(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
@ -6246,7 +6239,7 @@ func TestAgentConnectCALeafCert_nonBlockingQuery_after_blockingQuery_shouldNotBl
index string index string
issued structs.IssuedCert issued structs.IssuedCert
) )
runStep(t, "do initial non-blocking query", func(t *testing.T) { testutil.RunStep(t, "do initial non-blocking query", func(t *testing.T) {
req := httptest.NewRequest("GET", "/v1/agent/connect/ca/leaf/test", nil) req := httptest.NewRequest("GET", "/v1/agent/connect/ca/leaf/test", nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
a.srv.h.ServeHTTP(resp, req) a.srv.h.ServeHTTP(resp, req)
@ -6278,7 +6271,7 @@ func TestAgentConnectCALeafCert_nonBlockingQuery_after_blockingQuery_shouldNotBl
// in between both of these steps the data should still be there, causing // in between both of these steps the data should still be there, causing
// this to be a HIT that completes in less than 10m (the default inner leaf // this to be a HIT that completes in less than 10m (the default inner leaf
// cert blocking query timeout). // cert blocking query timeout).
runStep(t, "do a non-blocking query that should not block", func(t *testing.T) { testutil.RunStep(t, "do a non-blocking query that should not block", func(t *testing.T) {
req := httptest.NewRequest("GET", "/v1/agent/connect/ca/leaf/test", nil) req := httptest.NewRequest("GET", "/v1/agent/connect/ca/leaf/test", nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
a.srv.h.ServeHTTP(resp, req) a.srv.h.ServeHTTP(resp, req)

View File

@ -3944,14 +3944,14 @@ func TestACLResolver_ResolveToken_UpdatesPurgeTheCache(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &reqToken, &respToken) err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &reqToken, &respToken)
require.NoError(t, err) require.NoError(t, err)
runStep(t, "first resolve", func(t *testing.T) { testutil.RunStep(t, "first resolve", func(t *testing.T) {
authz, err := srv.ACLResolver.ResolveToken(token) authz, err := srv.ACLResolver.ResolveToken(token)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, authz) require.NotNil(t, authz)
require.Equal(t, acl.Allow, authz.KeyRead("foo", nil)) require.Equal(t, acl.Allow, authz.KeyRead("foo", nil))
}) })
runStep(t, "update the policy and resolve again", func(t *testing.T) { testutil.RunStep(t, "update the policy and resolve again", func(t *testing.T) {
reqPolicy := structs.ACLPolicySetRequest{ reqPolicy := structs.ACLPolicySetRequest{
Datacenter: "dc1", Datacenter: "dc1",
Policy: structs.ACLPolicy{ Policy: structs.ACLPolicy{
@ -3970,7 +3970,7 @@ func TestACLResolver_ResolveToken_UpdatesPurgeTheCache(t *testing.T) {
require.Equal(t, acl.Deny, authz.KeyRead("foo", nil)) require.Equal(t, acl.Deny, authz.KeyRead("foo", nil))
}) })
runStep(t, "delete the token", func(t *testing.T) { testutil.RunStep(t, "delete the token", func(t *testing.T) {
req := structs.ACLTokenDeleteRequest{ req := structs.ACLTokenDeleteRequest{
Datacenter: "dc1", Datacenter: "dc1",
TokenID: respToken.AccessorID, TokenID: respToken.AccessorID,

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
) )
@ -48,7 +49,7 @@ func TestConfigEntry_Apply(t *testing.T) {
// wait for cross-dc queries to work // wait for cross-dc queries to work
testrpc.WaitForLeader(t, s2.RPC, "dc1") testrpc.WaitForLeader(t, s2.RPC, "dc1")
runStep(t, "send the apply request to dc2 - it should get forwarded to dc1", func(t *testing.T) { testutil.RunStep(t, "send the apply request to dc2 - it should get forwarded to dc1", func(t *testing.T) {
updated := &structs.ServiceConfigEntry{ updated := &structs.ServiceConfigEntry{
Name: "foo", Name: "foo",
} }
@ -62,7 +63,7 @@ func TestConfigEntry_Apply(t *testing.T) {
}) })
var originalModifyIndex uint64 var originalModifyIndex uint64
runStep(t, "verify the entry was updated in the primary and secondary", func(t *testing.T) { testutil.RunStep(t, "verify the entry was updated in the primary and secondary", func(t *testing.T) {
// the previous RPC should not return until the primary has been updated but will return // the previous RPC should not return until the primary has been updated but will return
// before the secondary has the data. // before the secondary has the data.
_, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) _, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
@ -83,7 +84,7 @@ func TestConfigEntry_Apply(t *testing.T) {
originalModifyIndex = serviceConf.ModifyIndex originalModifyIndex = serviceConf.ModifyIndex
}) })
runStep(t, "update the entry again in the primary", func(t *testing.T) { testutil.RunStep(t, "update the entry again in the primary", func(t *testing.T) {
updated := &structs.ServiceConfigEntry{ updated := &structs.ServiceConfigEntry{
Name: "foo", Name: "foo",
MeshGateway: structs.MeshGatewayConfig{ MeshGateway: structs.MeshGatewayConfig{
@ -97,12 +98,12 @@ func TestConfigEntry_Apply(t *testing.T) {
Entry: updated, Entry: updated,
} }
runStep(t, "with the wrong CAS", func(t *testing.T) { testutil.RunStep(t, "with the wrong CAS", func(t *testing.T) {
var out bool var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
require.False(t, out) require.False(t, out)
}) })
runStep(t, "with the correct CAS", func(t *testing.T) { testutil.RunStep(t, "with the correct CAS", func(t *testing.T) {
var out bool var out bool
args.Entry.GetRaftIndex().ModifyIndex = originalModifyIndex args.Entry.GetRaftIndex().ModifyIndex = originalModifyIndex
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
@ -110,7 +111,7 @@ func TestConfigEntry_Apply(t *testing.T) {
}) })
}) })
runStep(t, "verify the entry was updated in the state store", func(t *testing.T) { testutil.RunStep(t, "verify the entry was updated in the state store", func(t *testing.T) {
_, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) _, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(t, err) require.NoError(t, err)
@ -122,10 +123,10 @@ func TestConfigEntry_Apply(t *testing.T) {
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
}) })
runStep(t, "verify no-op updates do not advance the raft indexes", func(t *testing.T) { testutil.RunStep(t, "verify no-op updates do not advance the raft indexes", func(t *testing.T) {
var modifyIndex uint64 var modifyIndex uint64
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
runStep(t, fmt.Sprintf("iteration %d", i), func(t *testing.T) { testutil.RunStep(t, fmt.Sprintf("iteration %d", i), func(t *testing.T) {
args := structs.ConfigEntryRequest{ args := structs.ConfigEntryRequest{
Datacenter: "dc1", Datacenter: "dc1",
Op: structs.ConfigEntryUpsert, Op: structs.ConfigEntryUpsert,
@ -329,7 +330,7 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
require.True(t, out) require.True(t, out)
} }
runStep(t, "test the errNotFound path", func(t *testing.T) { testutil.RunStep(t, "test the errNotFound path", func(t *testing.T) {
rpcBlockingQueryTestHarness(t, rpcBlockingQueryTestHarness(t,
func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) {
args := structs.ConfigEntryQuery{ args := structs.ConfigEntryQuery{
@ -508,7 +509,7 @@ func TestConfigEntry_List_BlockOnNoChange(t *testing.T) {
) )
} }
runStep(t, "test the errNotFound path", func(t *testing.T) { testutil.RunStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other") run(t, "other")
}) })
@ -531,7 +532,7 @@ func TestConfigEntry_List_BlockOnNoChange(t *testing.T) {
} }
} }
runStep(t, "test the errNotChanged path", func(t *testing.T) { testutil.RunStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other") run(t, "completely-different-other")
}) })
} }
@ -801,7 +802,7 @@ func TestConfigEntry_Delete(t *testing.T) {
// wait for cross-dc queries to work // wait for cross-dc queries to work
testrpc.WaitForLeader(t, s2.RPC, "dc1") testrpc.WaitForLeader(t, s2.RPC, "dc1")
runStep(t, "create a dummy service in the state store to look up", func(t *testing.T) { testutil.RunStep(t, "create a dummy service in the state store to look up", func(t *testing.T) {
entry := &structs.ServiceConfigEntry{ entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults, Kind: structs.ServiceDefaults,
Name: "foo", Name: "foo",
@ -809,7 +810,7 @@ func TestConfigEntry_Delete(t *testing.T) {
require.NoError(t, s1.fsm.State().EnsureConfigEntry(1, entry)) require.NoError(t, s1.fsm.State().EnsureConfigEntry(1, entry))
}) })
runStep(t, "verify it exists in the primary and is replicated to the secondary", func(t *testing.T) { testutil.RunStep(t, "verify it exists in the primary and is replicated to the secondary", func(t *testing.T) {
// Verify it's there. // Verify it's there.
_, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) _, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(t, err) require.NoError(t, err)
@ -827,7 +828,7 @@ func TestConfigEntry_Delete(t *testing.T) {
}) })
}) })
runStep(t, "send the delete request to dc2 - it should get forwarded to dc1", func(t *testing.T) { testutil.RunStep(t, "send the delete request to dc2 - it should get forwarded to dc1", func(t *testing.T) {
args := structs.ConfigEntryRequest{ args := structs.ConfigEntryRequest{
Datacenter: "dc2", Datacenter: "dc2",
Entry: &structs.ServiceConfigEntry{ Entry: &structs.ServiceConfigEntry{
@ -840,7 +841,7 @@ func TestConfigEntry_Delete(t *testing.T) {
require.True(t, out.Deleted) require.True(t, out.Deleted)
}) })
runStep(t, "verify the entry was deleted in the primary and secondary", func(t *testing.T) { testutil.RunStep(t, "verify the entry was deleted in the primary and secondary", func(t *testing.T) {
// Verify the entry was deleted. // Verify the entry was deleted.
_, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil) _, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
require.NoError(t, err) require.NoError(t, err)
@ -854,7 +855,7 @@ func TestConfigEntry_Delete(t *testing.T) {
}) })
}) })
runStep(t, "delete in dc1 again - should be fine", func(t *testing.T) { testutil.RunStep(t, "delete in dc1 again - should be fine", func(t *testing.T) {
args := structs.ConfigEntryRequest{ args := structs.ConfigEntryRequest{
Datacenter: "dc1", Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{ Entry: &structs.ServiceConfigEntry{
@ -1809,7 +1810,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
var index uint64 var index uint64
runStep(t, "foo and bar should be both http", func(t *testing.T) { testutil.RunStep(t, "foo and bar should be both http", func(t *testing.T) {
// Verify that we get the results of service-defaults for 'foo' and 'bar'. // Verify that we get the results of service-defaults for 'foo' and 'bar'.
var out structs.ServiceConfigResponse var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
@ -1843,7 +1844,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
index = out.Index index = out.Index
}) })
runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) { testutil.RunStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) {
// Now setup a blocking query for 'foo' while we erase the // Now setup a blocking query for 'foo' while we erase the
// service-defaults for bar. // service-defaults for bar.
@ -1896,7 +1897,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
index = out.Index index = out.Index
}) })
runStep(t, "foo should be http and bar should be unset", func(t *testing.T) { testutil.RunStep(t, "foo should be http and bar should be unset", func(t *testing.T) {
// Verify that we get the results of service-defaults for just 'foo'. // Verify that we get the results of service-defaults for just 'foo'.
var out structs.ServiceConfigResponse var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
@ -1922,7 +1923,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
index = out.Index index = out.Index
}) })
runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) { testutil.RunStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) {
// Now setup a blocking query for 'foo' while we erase the // Now setup a blocking query for 'foo' while we erase the
// service-defaults for foo. // service-defaults for foo.
@ -2183,7 +2184,7 @@ func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) {
require.True(t, out) require.True(t, out)
} }
runStep(t, "test the errNotFound path", func(t *testing.T) { testutil.RunStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other") run(t, "other")
}) })
@ -2199,7 +2200,7 @@ func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) {
require.True(t, out) require.True(t, out)
} }
runStep(t, "test the errNotChanged path", func(t *testing.T) { testutil.RunStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other") run(t, "completely-different-other")
}) })
} }
@ -2343,11 +2344,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) {
require.Equal(t, expose, proxyConf.Expose) require.Equal(t, expose, proxyConf.Expose)
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }
func Test_gateWriteToSecondary(t *testing.T) { func Test_gateWriteToSecondary(t *testing.T) {

View File

@ -442,7 +442,7 @@ func TestConnectCAConfig_TriggerRotation(t *testing.T) {
// Make sure the new root has been added along with an intermediate // Make sure the new root has been added along with an intermediate
// cross-signed by the old root. // cross-signed by the old root.
var newRootPEM string var newRootPEM string
runStep(t, "ensure roots look correct", func(t *testing.T) { testutil.RunStep(t, "ensure roots look correct", func(t *testing.T) {
args := &structs.DCSpecificRequest{ args := &structs.DCSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
} }
@ -483,7 +483,7 @@ func TestConnectCAConfig_TriggerRotation(t *testing.T) {
} }
}) })
runStep(t, "verify the new config was set", func(t *testing.T) { testutil.RunStep(t, "verify the new config was set", func(t *testing.T) {
args := &structs.DCSpecificRequest{ args := &structs.DCSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
} }
@ -498,7 +498,7 @@ func TestConnectCAConfig_TriggerRotation(t *testing.T) {
assert.Equal(t, actual, expected) assert.Equal(t, actual, expected)
}) })
runStep(t, "verify that new leaf certs get the cross-signed intermediate bundled", func(t *testing.T) { testutil.RunStep(t, "verify that new leaf certs get the cross-signed intermediate bundled", func(t *testing.T) {
// Generate a CSR and request signing // Generate a CSR and request signing
spiffeId := connect.TestSpiffeIDService(t, "web") spiffeId := connect.TestSpiffeIDService(t, "web")
csr, _ := connect.TestCSR(t, spiffeId) csr, _ := connect.TestCSR(t, spiffeId)
@ -509,7 +509,7 @@ func TestConnectCAConfig_TriggerRotation(t *testing.T) {
var reply structs.IssuedCert var reply structs.IssuedCert
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply)) require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply))
runStep(t, "verify that the cert is signed by the new CA", func(t *testing.T) { testutil.RunStep(t, "verify that the cert is signed by the new CA", func(t *testing.T) {
roots := x509.NewCertPool() roots := x509.NewCertPool()
require.True(t, roots.AppendCertsFromPEM([]byte(newRootPEM))) require.True(t, roots.AppendCertsFromPEM([]byte(newRootPEM)))
leaf, err := connect.ParseCert(reply.CertPEM) leaf, err := connect.ParseCert(reply.CertPEM)
@ -520,7 +520,7 @@ func TestConnectCAConfig_TriggerRotation(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}) })
runStep(t, "and that it validates via the intermediate", func(t *testing.T) { testutil.RunStep(t, "and that it validates via the intermediate", func(t *testing.T) {
roots := x509.NewCertPool() roots := x509.NewCertPool()
assert.True(t, roots.AppendCertsFromPEM([]byte(oldRoot.RootCert))) assert.True(t, roots.AppendCertsFromPEM([]byte(oldRoot.RootCert)))
leaf, err := connect.ParseCert(reply.CertPEM) leaf, err := connect.ParseCert(reply.CertPEM)
@ -540,7 +540,7 @@ func TestConnectCAConfig_TriggerRotation(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}) })
runStep(t, "verify other fields", func(t *testing.T) { testutil.RunStep(t, "verify other fields", func(t *testing.T) {
assert.Equal(t, "web", reply.Service) assert.Equal(t, "web", reply.Service)
assert.Equal(t, spiffeId.URI().String(), reply.ServiceURI) assert.Equal(t, spiffeId.URI().String(), reply.ServiceURI)
}) })

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
) )
@ -313,7 +314,7 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) {
) )
} }
runStep(t, "test the errNotFound path", func(t *testing.T) { testutil.RunStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other") run(t, "other")
}) })
@ -329,7 +330,7 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) {
require.True(t, out) require.True(t, out)
} }
runStep(t, "test the errNotChanged path", func(t *testing.T) { testutil.RunStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other") run(t, "completely-different-other")
}) })
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
@ -694,7 +695,7 @@ func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) {
register(t, "web", "foo") register(t, "web", "foo")
var lastIndex uint64 var lastIndex uint64
runStep(t, "read original", func(t *testing.T) { testutil.RunStep(t, "read original", func(t *testing.T) {
var out structs.IndexedCheckServiceNodes var out structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{ req := structs.ServiceSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -715,7 +716,7 @@ func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) {
lastIndex = out.Index lastIndex = out.Index
}) })
runStep(t, "read blocking query result", func(t *testing.T) { testutil.RunStep(t, "read blocking query result", func(t *testing.T) {
req := structs.ServiceSpecificRequest{ req := structs.ServiceSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
ServiceName: "web", ServiceName: "web",

View File

@ -1802,7 +1802,7 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) {
) )
} }
runStep(t, "test the errNotFound path", func(t *testing.T) { testutil.RunStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other", 0) run(t, "other", 0)
}) })
@ -1830,7 +1830,7 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) {
} }
} }
runStep(t, "test the errNotChanged path", func(t *testing.T) { testutil.RunStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other", 2) run(t, "completely-different-other", 2)
}) })
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
@ -2385,7 +2386,7 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) {
) )
} }
runStep(t, "test the errNotFound path", func(t *testing.T) { testutil.RunStep(t, "test the errNotFound path", func(t *testing.T) {
run(t, "other", 0) run(t, "other", 0)
}) })
@ -2398,7 +2399,7 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) {
// web -> api (allow) // web -> api (allow)
registerIntentionUpstreamEntries(t, codec, "") registerIntentionUpstreamEntries(t, codec, "")
runStep(t, "test the errNotChanged path", func(t *testing.T) { testutil.RunStep(t, "test the errNotChanged path", func(t *testing.T) {
run(t, "completely-different-other", 1) run(t, "completely-different-other", 1)
}) })
} }

View File

@ -58,7 +58,7 @@ func TestCAManager_Initialize_Vault_Secondary_SharedVault(t *testing.T) {
} }
}) })
runStep(t, "check primary DC", func(t *testing.T) { testutil.RunStep(t, "check primary DC", func(t *testing.T) {
testrpc.WaitForTestAgent(t, serverDC1.RPC, "dc1") testrpc.WaitForTestAgent(t, serverDC1.RPC, "dc1")
codec := rpcClient(t, serverDC1) codec := rpcClient(t, serverDC1)
@ -71,7 +71,7 @@ func TestCAManager_Initialize_Vault_Secondary_SharedVault(t *testing.T) {
verifyLeafCert(t, roots.Roots[0], leafPEM) verifyLeafCert(t, roots.Roots[0], leafPEM)
}) })
runStep(t, "start secondary DC", func(t *testing.T) { testutil.RunStep(t, "start secondary DC", func(t *testing.T) {
_, serverDC2 := testServerWithConfig(t, func(c *Config) { _, serverDC2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2" c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1" c.PrimaryDatacenter = "dc1"
@ -647,7 +647,7 @@ func TestCAManager_Initialize_Vault_WithIntermediateAsPrimaryCA(t *testing.T) {
} }
}) })
runStep(t, "check primary DC", func(t *testing.T) { testutil.RunStep(t, "check primary DC", func(t *testing.T) {
testrpc.WaitForTestAgent(t, s1.RPC, "dc1") testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1) codec := rpcClient(t, s1)
@ -664,7 +664,7 @@ func TestCAManager_Initialize_Vault_WithIntermediateAsPrimaryCA(t *testing.T) {
// TODO: renew primary leaf signing cert // TODO: renew primary leaf signing cert
// TODO: rotate root // TODO: rotate root
runStep(t, "run secondary DC", func(t *testing.T) { testutil.RunStep(t, "run secondary DC", func(t *testing.T) {
_, sDC2 := testServerWithConfig(t, func(c *Config) { _, sDC2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2" c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1" c.PrimaryDatacenter = "dc1"
@ -797,7 +797,7 @@ func TestCAManager_Initialize_Vault_WithExternalTrustedCA(t *testing.T) {
var origLeaf string var origLeaf string
roots := structs.IndexedCARoots{} roots := structs.IndexedCARoots{}
runStep(t, "verify primary DC", func(t *testing.T) { testutil.RunStep(t, "verify primary DC", func(t *testing.T) {
codec := rpcClient(t, serverDC1) codec := rpcClient(t, serverDC1)
err := msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", &structs.DCSpecificRequest{}, &roots) err := msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", &structs.DCSpecificRequest{}, &roots)
require.NoError(t, err) require.NoError(t, err)
@ -825,7 +825,7 @@ func TestCAManager_Initialize_Vault_WithExternalTrustedCA(t *testing.T) {
}) })
var origLeafSecondary string var origLeafSecondary string
runStep(t, "start secondary DC", func(t *testing.T) { testutil.RunStep(t, "start secondary DC", func(t *testing.T) {
joinWAN(t, serverDC2, serverDC1) joinWAN(t, serverDC2, serverDC1)
testrpc.WaitForActiveCARoot(t, serverDC2.RPC, "dc2", nil) testrpc.WaitForActiveCARoot(t, serverDC2.RPC, "dc2", nil)
@ -840,7 +840,7 @@ func TestCAManager_Initialize_Vault_WithExternalTrustedCA(t *testing.T) {
origLeafSecondary = leafPEM origLeafSecondary = leafPEM
}) })
runStep(t, "renew leaf signing CA in primary", func(t *testing.T) { testutil.RunStep(t, "renew leaf signing CA in primary", func(t *testing.T) {
previous := serverDC1.caManager.getLeafSigningCertFromRoot(roots.Active()) previous := serverDC1.caManager.getLeafSigningCertFromRoot(roots.Active())
renewLeafSigningCert(t, serverDC1.caManager, serverDC1.caManager.primaryRenewIntermediate) renewLeafSigningCert(t, serverDC1.caManager, serverDC1.caManager.primaryRenewIntermediate)
@ -862,7 +862,7 @@ func TestCAManager_Initialize_Vault_WithExternalTrustedCA(t *testing.T) {
verifyLeafCert(t, roots.Roots[0], origLeaf) verifyLeafCert(t, roots.Roots[0], origLeaf)
}) })
runStep(t, "renew leaf signing CA in secondary", func(t *testing.T) { testutil.RunStep(t, "renew leaf signing CA in secondary", func(t *testing.T) {
previous := serverDC2.caManager.getLeafSigningCertFromRoot(roots.Active()) previous := serverDC2.caManager.getLeafSigningCertFromRoot(roots.Active())
renewLeafSigningCert(t, serverDC2.caManager, serverDC2.caManager.secondaryRequestNewSigningCert) renewLeafSigningCert(t, serverDC2.caManager, serverDC2.caManager.secondaryRequestNewSigningCert)
@ -885,7 +885,7 @@ func TestCAManager_Initialize_Vault_WithExternalTrustedCA(t *testing.T) {
verifyLeafCert(t, roots.Roots[0], origLeaf) verifyLeafCert(t, roots.Roots[0], origLeaf)
}) })
runStep(t, "rotate root by changing the provider", func(t *testing.T) { testutil.RunStep(t, "rotate root by changing the provider", func(t *testing.T) {
codec := rpcClient(t, serverDC1) codec := rpcClient(t, serverDC1)
req := &structs.CARequest{ req := &structs.CARequest{
Op: structs.CAOpSetConfig, Op: structs.CAOpSetConfig,
@ -919,7 +919,7 @@ func TestCAManager_Initialize_Vault_WithExternalTrustedCA(t *testing.T) {
verifyLeafCertWithRoots(t, rootsSecondary, origLeafSecondary) verifyLeafCertWithRoots(t, rootsSecondary, origLeafSecondary)
}) })
runStep(t, "rotate to a different external root", func(t *testing.T) { testutil.RunStep(t, "rotate to a different external root", func(t *testing.T) {
setupPrimaryCA(t, vclient, "pki-primary-2/", rootPEM) setupPrimaryCA(t, vclient, "pki-primary-2/", rootPEM)
codec := rpcClient(t, serverDC1) codec := rpcClient(t, serverDC1)

View File

@ -83,7 +83,7 @@ func TestConnectCA_ConfigurationSet_ChangeKeyConfig_Primary(t *testing.T) {
require.Equal(r, src.keyBits, caRoot.PrivateKeyBits) require.Equal(r, src.keyBits, caRoot.PrivateKeyBits)
}) })
runStep(t, "sign leaf cert and make sure chain is correct", func(t *testing.T) { testutil.RunStep(t, "sign leaf cert and make sure chain is correct", func(t *testing.T) {
spiffeService := &connect.SpiffeIDService{ spiffeService := &connect.SpiffeIDService{
Host: "node1", Host: "node1",
Namespace: "default", Namespace: "default",
@ -103,14 +103,14 @@ func TestConnectCA_ConfigurationSet_ChangeKeyConfig_Primary(t *testing.T) {
require.NoError(t, connect.ValidateLeaf(caRoot.RootCert, leafPEM, []string{})) require.NoError(t, connect.ValidateLeaf(caRoot.RootCert, leafPEM, []string{}))
}) })
runStep(t, "verify persisted state is correct", func(t *testing.T) { testutil.RunStep(t, "verify persisted state is correct", func(t *testing.T) {
state := srv.fsm.State() state := srv.fsm.State()
_, caConfig, err := state.CAConfig(nil) _, caConfig, err := state.CAConfig(nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, providerState, caConfig.State) require.Equal(t, providerState, caConfig.State)
}) })
runStep(t, "change roots", func(t *testing.T) { testutil.RunStep(t, "change roots", func(t *testing.T) {
// Update a config value // Update a config value
newConfig := &structs.CAConfiguration{ newConfig := &structs.CAConfiguration{
Provider: "consul", Provider: "consul",
@ -145,7 +145,7 @@ func TestConnectCA_ConfigurationSet_ChangeKeyConfig_Primary(t *testing.T) {
require.Equal(r, dst.keyBits, newCaRoot.PrivateKeyBits) require.Equal(r, dst.keyBits, newCaRoot.PrivateKeyBits)
}) })
runStep(t, "sign leaf cert and make sure NEW chain is correct", func(t *testing.T) { testutil.RunStep(t, "sign leaf cert and make sure NEW chain is correct", func(t *testing.T) {
spiffeService := &connect.SpiffeIDService{ spiffeService := &connect.SpiffeIDService{
Host: "node1", Host: "node1",
Namespace: "default", Namespace: "default",
@ -165,7 +165,7 @@ func TestConnectCA_ConfigurationSet_ChangeKeyConfig_Primary(t *testing.T) {
require.NoError(t, connect.ValidateLeaf(newCaRoot.RootCert, leafPEM, []string{})) require.NoError(t, connect.ValidateLeaf(newCaRoot.RootCert, leafPEM, []string{}))
}) })
runStep(t, "verify persisted state is still correct", func(t *testing.T) { testutil.RunStep(t, "verify persisted state is still correct", func(t *testing.T) {
state := srv.fsm.State() state := srv.fsm.State()
_, caConfig, err := state.CAConfig(nil) _, caConfig, err := state.CAConfig(nil)
require.NoError(t, err) require.NoError(t, err)

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
) )
@ -82,7 +83,7 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) {
peeringClient := pbpeering.NewPeeringServiceClient(conn) peeringClient := pbpeering.NewPeeringServiceClient(conn)
runStep(t, "forward a write", func(t *testing.T) { testutil.RunStep(t, "forward a write", func(t *testing.T) {
// Do the grpc Write call to server2 // Do the grpc Write call to server2
req := pbpeering.GenerateTokenRequest{ req := pbpeering.GenerateTokenRequest{
Datacenter: "dc1", Datacenter: "dc1",

View File

@ -1145,7 +1145,7 @@ func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
runStep(t, "Register a dummy node with a service", func(t *testing.T) { testutil.RunStep(t, "Register a dummy node with a service", func(t *testing.T) {
req := &structs.RegisterRequest{ req := &structs.RegisterRequest{
Node: "node1", Node: "node1",
Address: "3.4.5.6", Address: "3.4.5.6",
@ -1183,7 +1183,7 @@ func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) {
} }
// Try to use it locally (it should work) // Try to use it locally (it should work)
runStep(t, "token used locally should work", func(t *testing.T) { testutil.RunStep(t, "token used locally should work", func(t *testing.T) {
arg := &pbsubscribe.SubscribeRequest{ arg := &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
@ -1198,7 +1198,7 @@ func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) {
require.Equal(t, localToken2.SecretID, arg.Token, "token should not be stripped") require.Equal(t, localToken2.SecretID, arg.Token, "token should not be stripped")
}) })
runStep(t, "token used remotely should not work", func(t *testing.T) { testutil.RunStep(t, "token used remotely should not work", func(t *testing.T) {
arg := &pbsubscribe.SubscribeRequest{ arg := &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
@ -1216,7 +1216,7 @@ func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) {
require.True(t, event.Payload.(*pbsubscribe.Event_EndOfSnapshot).EndOfSnapshot) require.True(t, event.Payload.(*pbsubscribe.Event_EndOfSnapshot).EndOfSnapshot)
}) })
runStep(t, "update anonymous token to read services", func(t *testing.T) { testutil.RunStep(t, "update anonymous token to read services", func(t *testing.T) {
tokenUpsertReq := structs.ACLTokenSetRequest{ tokenUpsertReq := structs.ACLTokenSetRequest{
Datacenter: "dc1", Datacenter: "dc1",
ACLToken: structs.ACLToken{ ACLToken: structs.ACLToken{
@ -1233,7 +1233,7 @@ func TestRPC_LocalTokenStrippedOnForward_GRPC(t *testing.T) {
require.NotEmpty(t, token.SecretID) require.NotEmpty(t, token.SecretID)
}) })
runStep(t, "token used remotely should fallback on anonymous token now", func(t *testing.T) { testutil.RunStep(t, "token used remotely should fallback on anonymous token now", func(t *testing.T) {
arg := &pbsubscribe.SubscribeRequest{ arg := &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",

View File

@ -340,7 +340,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
require.Equal(t, checks, out) require.Equal(t, checks, out)
} }
runStep(t, "add a node", func(t *testing.T) { testutil.RunStep(t, "add a node", func(t *testing.T) {
req := makeReq(nil) req := makeReq(nil)
require.NoError(t, s.EnsureRegistration(1, req)) require.NoError(t, s.EnsureRegistration(1, req))
@ -348,7 +348,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
verifyNode(t) verifyNode(t)
}) })
runStep(t, "add a node with invalid meta", func(t *testing.T) { testutil.RunStep(t, "add a node with invalid meta", func(t *testing.T) {
// Add in a invalid service definition with too long Key value for Meta // Add in a invalid service definition with too long Key value for Meta
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
@ -365,7 +365,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
}) })
// Add in a service definition. // Add in a service definition.
runStep(t, "add a service definition", func(t *testing.T) { testutil.RunStep(t, "add a service definition", func(t *testing.T) {
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
ID: "redis1", ID: "redis1",
@ -385,7 +385,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
}) })
// Add in a top-level check. // Add in a top-level check.
runStep(t, "add a top level check", func(t *testing.T) { testutil.RunStep(t, "add a top level check", func(t *testing.T) {
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
ID: "redis1", ID: "redis1",
@ -413,7 +413,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
// Add a service check which should populate the ServiceName // Add a service check which should populate the ServiceName
// and ServiceTags fields in the response. // and ServiceTags fields in the response.
runStep(t, "add a service check", func(t *testing.T) { testutil.RunStep(t, "add a service check", func(t *testing.T) {
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
ID: "redis1", ID: "redis1",
@ -449,7 +449,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
}) })
// Try to register a check for some other node (top-level check). // Try to register a check for some other node (top-level check).
runStep(t, "try to register a check for some other node via the top level check", func(t *testing.T) { testutil.RunStep(t, "try to register a check for some other node via the top level check", func(t *testing.T) {
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
ID: "redis1", ID: "redis1",
@ -482,7 +482,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
verifyChecks(t) verifyChecks(t)
}) })
runStep(t, "try to register a check for some other node via the checks array", func(t *testing.T) { testutil.RunStep(t, "try to register a check for some other node via the checks array", func(t *testing.T) {
// Try to register a check for some other node (checks array). // Try to register a check for some other node (checks array).
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
@ -626,7 +626,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Start with just a node. // Start with just a node.
runStep(t, "add a node", func(t *testing.T) { testutil.RunStep(t, "add a node", func(t *testing.T) {
req := makeReq(nil) req := makeReq(nil)
restore := s.Restore() restore := s.Restore()
require.NoError(t, restore.Registration(1, req)) require.NoError(t, restore.Registration(1, req))
@ -638,7 +638,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
}) })
// Add in a service definition. // Add in a service definition.
runStep(t, "add a service definition", func(t *testing.T) { testutil.RunStep(t, "add a service definition", func(t *testing.T) {
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
ID: "redis1", ID: "redis1",
@ -664,7 +664,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
verifyService(t, s, nodeName) verifyService(t, s, nodeName)
}) })
runStep(t, "add a top-level check", func(t *testing.T) { testutil.RunStep(t, "add a top-level check", func(t *testing.T) {
// Add in a top-level check. // Add in a top-level check.
// //
// Verify that node name references in checks are case-insensitive during // Verify that node name references in checks are case-insensitive during
@ -705,7 +705,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
verifyCheck(t, s) verifyCheck(t, s)
}) })
runStep(t, "add another check via the slice", func(t *testing.T) { testutil.RunStep(t, "add another check via the slice", func(t *testing.T) {
// Add in another check via the slice. // Add in another check via the slice.
req := makeReq(func(req *structs.RegisterRequest) { req := makeReq(func(req *structs.RegisterRequest) {
req.Service = &structs.NodeService{ req.Service = &structs.NodeService{
@ -8146,11 +8146,10 @@ func TestStateStore_EnsureService_ServiceNames(t *testing.T) {
require.Empty(t, got) require.Empty(t, got)
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }
func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) { func assertMaxIndexes(t *testing.T, tx ReadTxn, expect map[string]uint64, skip ...string) {

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/sdk/testutil"
) )
// TODO(partitions): test partitioned nodes here // TODO(partitions): test partitioned nodes here
@ -254,7 +255,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
// the read side. // the read side.
require.Equal(t, append(updates, badUpdate), dump) require.Equal(t, append(updates, badUpdate), dump)
runStep(t, "restore the values into a new state store", func(t *testing.T) { testutil.RunStep(t, "restore the values into a new state store", func(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
restore := s.Restore() restore := s.Restore()
require.NoError(t, restore.Coordinates(6, dump)) require.NoError(t, restore.Coordinates(6, dump))

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil"
) )
func insertTestPeerings(t *testing.T, s *Store) { func insertTestPeerings(t *testing.T, s *Store) {
@ -643,14 +644,14 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
runStep(t, "no exported services", func(t *testing.T) { testutil.RunStep(t, "no exported services", func(t *testing.T) {
idx, exported, err := s.ExportedServicesForPeer(ws, id) idx, exported, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, lastIdx, idx) require.Equal(t, lastIdx, idx)
require.Empty(t, exported) require.Empty(t, exported)
}) })
runStep(t, "config entry with exact service names", func(t *testing.T) { testutil.RunStep(t, "config entry with exact service names", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{ entry := &structs.ExportedServicesConfigEntry{
Name: "default", Name: "default",
Services: []structs.ExportedService{ Services: []structs.ExportedService{
@ -703,7 +704,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.ElementsMatch(t, expect, got) require.ElementsMatch(t, expect, got)
}) })
runStep(t, "config entry with wildcard service name picks up existing service", func(t *testing.T) { testutil.RunStep(t, "config entry with wildcard service name picks up existing service", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{Node: "foo", Address: "127.0.0.1"})) require.NoError(t, s.EnsureNode(lastIdx, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
@ -742,7 +743,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.Equal(t, expect, got) require.Equal(t, expect, got)
}) })
runStep(t, "config entry with wildcard service names picks up new registrations", func(t *testing.T) { testutil.RunStep(t, "config entry with wildcard service names picks up new registrations", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "payments", Service: "payments", Port: 5000})) require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{ID: "payments", Service: "payments", Port: 5000}))
@ -778,7 +779,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.ElementsMatch(t, expect, got) require.ElementsMatch(t, expect, got)
}) })
runStep(t, "config entry with wildcard service names picks up service deletions", func(t *testing.T) { testutil.RunStep(t, "config entry with wildcard service names picks up service deletions", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, s.DeleteService(lastIdx, "foo", "billing", nil, "")) require.NoError(t, s.DeleteService(lastIdx, "foo", "billing", nil, ""))
@ -801,7 +802,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
require.ElementsMatch(t, expect, got) require.ElementsMatch(t, expect, got)
}) })
runStep(t, "deleting the config entry clears exported services", func(t *testing.T) { testutil.RunStep(t, "deleting the config entry clears exported services", func(t *testing.T) {
require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", structs.DefaultEnterpriseMetaInDefaultPartition())) require.NoError(t, s.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", structs.DefaultEnterpriseMetaInDefaultPartition()))
idx, exported, err := s.ExportedServicesForPeer(ws, id) idx, exported, err := s.ExportedServicesForPeer(ws, id)
require.NoError(t, err) require.NoError(t, err)
@ -997,7 +998,7 @@ func TestStateStore_PeeringsForService(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
runStep(t, tc.name, func(t *testing.T) { testutil.RunStep(t, tc.name, func(t *testing.T) {
run(t, tc) run(t, tc)
}) })
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/sdk/testutil"
) )
type intTopic int type intTopic int
@ -254,7 +255,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
// splicing the topic buffer onto the snapshot. // splicing the topic buffer onto the snapshot.
publisher.publishEvent([]Event{testSnapshotEvent}) publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) { testutil.RunStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req) sub, err := publisher.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)
defer sub.Unsubscribe() defer sub.Unsubscribe()
@ -269,7 +270,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
require.Equal(t, uint64(1), next.Index) require.Equal(t, uint64(1), next.Index)
}) })
runStep(t, "resume the subscription", func(t *testing.T) { testutil.RunStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req newReq := *req
newReq.Index = 1 newReq.Index = 1
sub, err := publisher.Subscribe(&newReq) sub, err := publisher.Subscribe(&newReq)
@ -304,7 +305,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
// Include the same event in the topicBuffer // Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent}) publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) { testutil.RunStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req) sub, err := publisher.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)
defer sub.Unsubscribe() defer sub.Unsubscribe()
@ -325,11 +326,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
Payload: simplePayload{key: "sub-key", value: "event-3"}, Payload: simplePayload{key: "sub-key", value: "event-3"},
} }
runStep(t, "publish an event while unsubed", func(t *testing.T) { testutil.RunStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent}) publisher.publishEvent([]Event{nextEvent})
}) })
runStep(t, "resume the subscription", func(t *testing.T) { testutil.RunStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req newReq := *req
newReq.Index = 1 newReq.Index = 1
sub, err := publisher.Subscribe(&newReq) sub, err := publisher.Subscribe(&newReq)
@ -365,7 +366,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
// splicing the topic buffer onto the snapshot. // splicing the topic buffer onto the snapshot.
publisher.publishEvent([]Event{testSnapshotEvent}) publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) { testutil.RunStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req) sub, err := publisher.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)
defer sub.Unsubscribe() defer sub.Unsubscribe()
@ -386,7 +387,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
Payload: simplePayload{key: "sub-key", value: "event-3"}, Payload: simplePayload{key: "sub-key", value: "event-3"},
} }
runStep(t, "publish an event while unsubed", func(t *testing.T) { testutil.RunStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent}) publisher.publishEvent([]Event{nextEvent})
}) })
@ -394,7 +395,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
return 0, fmt.Errorf("error should not be seen, cache should have been used") return 0, fmt.Errorf("error should not be seen, cache should have been used")
} }
runStep(t, "resume the subscription", func(t *testing.T) { testutil.RunStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req newReq := *req
newReq.Index = 1 newReq.Index = 1
sub, err := publisher.Subscribe(&newReq) sub, err := publisher.Subscribe(&newReq)
@ -452,7 +453,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi
publisher.publishEvent([]Event{testSnapshotEvent}) publisher.publishEvent([]Event{testSnapshotEvent})
publisher.publishEvent([]Event{nextEvent}) publisher.publishEvent([]Event{nextEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) { testutil.RunStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req) sub, err := publisher.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)
defer sub.Unsubscribe() defer sub.Unsubscribe()
@ -476,7 +477,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi
return 0, fmt.Errorf("error should not be seen, cache should have been used") return 0, fmt.Errorf("error should not be seen, cache should have been used")
} }
runStep(t, "resume the subscription", func(t *testing.T) { testutil.RunStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req newReq := *req
newReq.Index = 0 newReq.Index = 0
sub, err := publisher.Subscribe(&newReq) sub, err := publisher.Subscribe(&newReq)
@ -494,11 +495,10 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi
}) })
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }
func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) { func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {

View File

@ -28,6 +28,7 @@ import (
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
@ -63,7 +64,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
ids := newCounter() ids := newCounter()
var req *structs.RegisterRequest var req *structs.RegisterRequest
runStep(t, "register two instances of the redis service", func(t *testing.T) { testutil.RunStep(t, "register two instances of the redis service", func(t *testing.T) {
req = &structs.RegisterRequest{ req = &structs.RegisterRequest{
Node: "node1", Node: "node1",
Address: "3.4.5.6", Address: "3.4.5.6",
@ -91,7 +92,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
}) })
runStep(t, "register a service by a different name", func(t *testing.T) { testutil.RunStep(t, "register a service by a different name", func(t *testing.T) {
req := &structs.RegisterRequest{ req := &structs.RegisterRequest{
Node: "other", Node: "other",
Address: "2.3.4.5", Address: "2.3.4.5",
@ -116,7 +117,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
chEvents := make(chan eventOrError, 0) chEvents := make(chan eventOrError, 0)
var snapshotEvents []*pbsubscribe.Event var snapshotEvents []*pbsubscribe.Event
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { testutil.RunStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
@ -131,7 +132,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
} }
}) })
runStep(t, "receive the initial snapshot of events", func(t *testing.T) { testutil.RunStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{ expected := []*pbsubscribe.Event{
{ {
Index: ids.For("reg3"), Index: ids.For("reg3"),
@ -207,7 +208,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
prototest.AssertDeepEqual(t, expected, snapshotEvents) prototest.AssertDeepEqual(t, expected, snapshotEvents)
}) })
runStep(t, "update the registration by adding a check", func(t *testing.T) { testutil.RunStep(t, "update the registration by adding a check", func(t *testing.T) {
req.Check = &structs.HealthCheck{ req.Check = &structs.HealthCheck{
Node: "node2", Node: "node2",
CheckID: "check1", CheckID: "check1",
@ -440,7 +441,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
ids := newCounter() ids := newCounter()
var req *structs.RegisterRequest var req *structs.RegisterRequest
runStep(t, "register three services", func(t *testing.T) { testutil.RunStep(t, "register three services", func(t *testing.T) {
req = &structs.RegisterRequest{ req = &structs.RegisterRequest{
Node: "other", Node: "other",
Address: "2.3.4.5", Address: "2.3.4.5",
@ -486,7 +487,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
chEvents := make(chan eventOrError, 0) chEvents := make(chan eventOrError, 0)
var snapshotEvents []*pbsubscribe.Event var snapshotEvents []*pbsubscribe.Event
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { testutil.RunStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
@ -502,7 +503,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
} }
}) })
runStep(t, "receive the initial snapshot of events", func(t *testing.T) { testutil.RunStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{ expected := []*pbsubscribe.Event{
{ {
Index: ids.Last(), Index: ids.Last(),
@ -578,7 +579,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
prototest.AssertDeepEqual(t, expected, snapshotEvents) prototest.AssertDeepEqual(t, expected, snapshotEvents)
}) })
runStep(t, "update the registration by adding a check", func(t *testing.T) { testutil.RunStep(t, "update the registration by adding a check", func(t *testing.T) {
req.Check = &structs.HealthCheck{ req.Check = &structs.HealthCheck{
Node: "node2", Node: "node2",
CheckID: types.CheckID("check1"), CheckID: types.CheckID("check1"),
@ -657,7 +658,7 @@ func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testi
addr := runTestServer(t, NewServer(backend, hclog.New(nil))) addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
token := "this-token-is-good" token := "this-token-is-good"
runStep(t, "create an ACL policy", func(t *testing.T) { testutil.RunStep(t, "create an ACL policy", func(t *testing.T) {
rules := ` rules := `
service "foo" { service "foo" {
policy = "write" policy = "write"
@ -684,7 +685,7 @@ node "node1" {
ids := newCounter() ids := newCounter()
var req *structs.RegisterRequest var req *structs.RegisterRequest
runStep(t, "register services", func(t *testing.T) { testutil.RunStep(t, "register services", func(t *testing.T) {
req = &structs.RegisterRequest{ req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
@ -743,7 +744,7 @@ node "node1" {
chEvents := make(chan eventOrError, 0) chEvents := make(chan eventOrError, 0)
runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) { testutil.RunStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) {
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "foo", Key: "foo",
@ -761,7 +762,7 @@ node "node1" {
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
}) })
runStep(t, "update the service to receive an event", func(t *testing.T) { testutil.RunStep(t, "update the service to receive an event", func(t *testing.T) {
req = &structs.RegisterRequest{ req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
@ -788,7 +789,7 @@ node "node1" {
require.Equal(t, int32(1234), service.Port) require.Equal(t, int32(1234), service.Port)
}) })
runStep(t, "updates to the service on the denied node, should not send an event", func(t *testing.T) { testutil.RunStep(t, "updates to the service on the denied node, should not send an event", func(t *testing.T) {
req = &structs.RegisterRequest{ req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "denied", Node: "denied",
@ -812,7 +813,7 @@ node "node1" {
assertNoEvents(t, chEvents) assertNoEvents(t, chEvents)
}) })
runStep(t, "subscribe to a topic where events are not visible", func(t *testing.T) { testutil.RunStep(t, "subscribe to a topic where events are not visible", func(t *testing.T) {
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "bar", Key: "bar",
@ -853,7 +854,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) {
addr := runTestServer(t, NewServer(backend, hclog.New(nil))) addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
token := "this-token-is-good" token := "this-token-is-good"
runStep(t, "create an ACL policy", func(t *testing.T) { testutil.RunStep(t, "create an ACL policy", func(t *testing.T) {
rules := ` rules := `
service "foo" { service "foo" {
policy = "write" policy = "write"
@ -886,7 +887,7 @@ node "node1" {
chEvents := make(chan eventOrError, 0) chEvents := make(chan eventOrError, 0)
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { testutil.RunStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
@ -899,7 +900,7 @@ node "node1" {
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
}) })
runStep(t, "updates to the token should close the stream", func(t *testing.T) { testutil.RunStep(t, "updates to the token should close the stream", func(t *testing.T) {
tokenID, err := uuid.GenerateUUID() tokenID, err := uuid.GenerateUUID()
require.NoError(t, err) require.NoError(t, err)
@ -940,11 +941,10 @@ func logError(t *testing.T, f func() error) func() {
} }
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }
func TestNewEventFromSteamEvent(t *testing.T) { func TestNewEventFromSteamEvent(t *testing.T) {

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
@ -958,13 +959,6 @@ use_streaming_backend = true
}, },
} }
runStep := func(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
register := func(t *testing.T, a *TestAgent, name, tag string) { register := func(t *testing.T, a *TestAgent, name, tag string) {
args := &structs.RegisterRequest{ args := &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -998,7 +992,7 @@ use_streaming_backend = true
// Initial request with a filter should return one. // Initial request with a filter should return one.
var lastIndex uint64 var lastIndex uint64
runStep(t, "read original", func(t *testing.T) { testutil.RunStep(t, "read original", func(t *testing.T) {
req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil) req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil)
require.NoError(t, err) require.NoError(t, err)
@ -1024,7 +1018,7 @@ use_streaming_backend = true
}) })
const timeout = 30 * time.Second const timeout = 30 * time.Second
runStep(t, "read blocking query result", func(t *testing.T) { testutil.RunStep(t, "read blocking query result", func(t *testing.T) {
var ( var (
// out and resp are not safe to read until reading from errCh // out and resp are not safe to read until reading from errCh
out structs.CheckServiceNodes out structs.CheckServiceNodes

View File

@ -153,7 +153,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
err := client.Send(sub) err := client.Send(sub)
require.NoError(t, err) require.NoError(t, err)
runStep(t, "new stream gets tracked", func(t *testing.T) { testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(peerID) status, ok := srv.StreamStatus(peerID)
require.True(r, ok) require.True(r, ok)
@ -175,7 +175,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
} }
prototest.AssertDeepEqual(t, expect, receivedSub) prototest.AssertDeepEqual(t, expect, receivedSub)
runStep(t, "terminate the stream", func(t *testing.T) { testutil.RunStep(t, "terminate the stream", func(t *testing.T) {
done := srv.ConnectedStreams()[peerID] done := srv.ConnectedStreams()[peerID]
close(done) close(done)
@ -228,7 +228,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
err := client.Send(sub) err := client.Send(sub)
require.NoError(t, err) require.NoError(t, err)
runStep(t, "new stream gets tracked", func(t *testing.T) { testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
status, ok := srv.StreamStatus(peerID) status, ok := srv.StreamStatus(peerID)
require.True(r, ok) require.True(r, ok)
@ -236,7 +236,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}) })
}) })
runStep(t, "client receives initial subscription", func(t *testing.T) { testutil.RunStep(t, "client receives initial subscription", func(t *testing.T) {
ack, err := client.Recv() ack, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
@ -255,7 +255,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
var sequence uint64 var sequence uint64
var lastSendSuccess time.Time var lastSendSuccess time.Time
runStep(t, "ack tracked as success", func(t *testing.T) { testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
ack := &pbpeering.ReplicationMessage{ ack := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{ Request: &pbpeering.ReplicationMessage_Request{
@ -288,7 +288,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
var lastNack time.Time var lastNack time.Time
var lastNackMsg string var lastNackMsg string
runStep(t, "nack tracked as error", func(t *testing.T) { testutil.RunStep(t, "nack tracked as error", func(t *testing.T) {
nack := &pbpeering.ReplicationMessage{ nack := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{ Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{ Request: &pbpeering.ReplicationMessage_Request{
@ -325,7 +325,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
var lastRecvSuccess time.Time var lastRecvSuccess time.Time
runStep(t, "response applied locally", func(t *testing.T) { testutil.RunStep(t, "response applied locally", func(t *testing.T) {
resp := &pbpeering.ReplicationMessage{ resp := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Response_{ Payload: &pbpeering.ReplicationMessage_Response_{
Response: &pbpeering.ReplicationMessage_Response{ Response: &pbpeering.ReplicationMessage_Response{
@ -373,7 +373,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
var lastRecvError time.Time var lastRecvError time.Time
var lastRecvErrorMsg string var lastRecvErrorMsg string
runStep(t, "response fails to apply locally", func(t *testing.T) { testutil.RunStep(t, "response fails to apply locally", func(t *testing.T) {
resp := &pbpeering.ReplicationMessage{ resp := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Response_{ Payload: &pbpeering.ReplicationMessage_Response_{
Response: &pbpeering.ReplicationMessage_Response{ Response: &pbpeering.ReplicationMessage_Response{
@ -427,7 +427,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}) })
}) })
runStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) { testutil.RunStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) {
client.Close() client.Close()
sequence++ sequence++
@ -533,7 +533,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
runStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) { testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{ entry := &structs.ExportedServicesConfigEntry{
Name: "default", Name: "default",
Services: []structs.ExportedService{ Services: []structs.ExportedService{
@ -577,7 +577,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000}, Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
} }
runStep(t, "registering mongo instance leads to an UPSERT event", func(t *testing.T) { testutil.RunStep(t, "registering mongo instance leads to an UPSERT event", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node)) require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
@ -596,7 +596,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
}) })
}) })
runStep(t, "un-exporting mysql leads to a DELETE event for mysql", func(t *testing.T) { testutil.RunStep(t, "un-exporting mysql leads to a DELETE event for mysql", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{ entry := &structs.ExportedServicesConfigEntry{
Name: "default", Name: "default",
Services: []structs.ExportedService{ Services: []structs.ExportedService{
@ -623,7 +623,7 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
}) })
}) })
runStep(t, "deleting the config entry leads to a DELETE event for mongo", func(t *testing.T) { testutil.RunStep(t, "deleting the config entry leads to a DELETE event for mongo", func(t *testing.T) {
lastIdx++ lastIdx++
err = store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", nil) err = store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", nil)
require.NoError(t, err) require.NoError(t, err)

View File

@ -5,6 +5,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -22,7 +23,7 @@ func TestStreamTracker_EnsureConnectedDisconnected(t *testing.T) {
err error err error
) )
runStep(t, "new stream", func(t *testing.T) { testutil.RunStep(t, "new stream", func(t *testing.T) {
statusPtr, err = tracker.connected(peerID) statusPtr, err = tracker.connected(peerID)
require.NoError(t, err) require.NoError(t, err)
@ -35,7 +36,7 @@ func TestStreamTracker_EnsureConnectedDisconnected(t *testing.T) {
require.Equal(t, expect, status) require.Equal(t, expect, status)
}) })
runStep(t, "duplicate gets rejected", func(t *testing.T) { testutil.RunStep(t, "duplicate gets rejected", func(t *testing.T) {
_, err := tracker.connected(peerID) _, err := tracker.connected(peerID)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), `there is an active stream for the given PeerID "63b60245-c475-426b-b314-4588d210859d"`) require.Contains(t, err.Error(), `there is an active stream for the given PeerID "63b60245-c475-426b-b314-4588d210859d"`)
@ -44,7 +45,7 @@ func TestStreamTracker_EnsureConnectedDisconnected(t *testing.T) {
var sequence uint64 var sequence uint64
var lastSuccess time.Time var lastSuccess time.Time
runStep(t, "stream updated", func(t *testing.T) { testutil.RunStep(t, "stream updated", func(t *testing.T) {
statusPtr.trackAck() statusPtr.trackAck()
sequence++ sequence++
@ -59,7 +60,7 @@ func TestStreamTracker_EnsureConnectedDisconnected(t *testing.T) {
require.Equal(t, expect, status) require.Equal(t, expect, status)
}) })
runStep(t, "disconnect", func(t *testing.T) { testutil.RunStep(t, "disconnect", func(t *testing.T) {
tracker.disconnected(peerID) tracker.disconnected(peerID)
sequence++ sequence++
@ -73,7 +74,7 @@ func TestStreamTracker_EnsureConnectedDisconnected(t *testing.T) {
require.Equal(t, expect, status) require.Equal(t, expect, status)
}) })
runStep(t, "re-connect", func(t *testing.T) { testutil.RunStep(t, "re-connect", func(t *testing.T) {
_, err := tracker.connected(peerID) _, err := tracker.connected(peerID)
require.NoError(t, err) require.NoError(t, err)
@ -89,7 +90,7 @@ func TestStreamTracker_EnsureConnectedDisconnected(t *testing.T) {
require.Equal(t, expect, status) require.Equal(t, expect, status)
}) })
runStep(t, "delete", func(t *testing.T) { testutil.RunStep(t, "delete", func(t *testing.T) {
tracker.deleteStatus(peerID) tracker.deleteStatus(peerID)
status, ok := tracker.streamStatus(peerID) status, ok := tracker.streamStatus(peerID)

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
) )
@ -85,7 +86,7 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
}, },
} }
runStep(t, "registering exported service instance yields update", func(t *testing.T) { testutil.RunStep(t, "registering exported service instance yields update", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql1.Node)) require.NoError(t, store.EnsureNode(lastIdx, mysql1.Node))
@ -125,7 +126,7 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
}, },
} }
runStep(t, "additional instances are returned when registered", func(t *testing.T) { testutil.RunStep(t, "additional instances are returned when registered", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql2.Node)) require.NoError(t, store.EnsureNode(lastIdx, mysql2.Node))
@ -161,7 +162,7 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
}) })
}) })
runStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) { testutil.RunStep(t, "no updates are received for services not exported to my-peering", func(t *testing.T) {
mongo := &structs.CheckServiceNode{ mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000}, Service: &structs.NodeService{ID: "mongo", Service: "mongo", Port: 5000},
@ -193,7 +194,7 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
} }
}) })
runStep(t, "deregister an instance and it gets removed from the output", func(t *testing.T) { testutil.RunStep(t, "deregister an instance and it gets removed from the output", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, store.DeleteService(lastIdx, "foo", mysql1.Service.ID, nil, "")) require.NoError(t, store.DeleteService(lastIdx, "foo", mysql1.Service.ID, nil, ""))
@ -215,7 +216,7 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
} }
}) })
runStep(t, "deregister the last instance and the output is empty", func(t *testing.T) { testutil.RunStep(t, "deregister the last instance and the output is empty", func(t *testing.T) {
lastIdx++ lastIdx++
require.NoError(t, store.DeleteService(lastIdx, "bar", mysql2.Service.ID, nil, "")) require.NoError(t, store.DeleteService(lastIdx, "bar", mysql2.Service.ID, nil, ""))
@ -295,7 +296,7 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
// Expect this to fire // Expect this to fire
} }
runStep(t, "exporting the two services yields an update for both", func(t *testing.T) { testutil.RunStep(t, "exporting the two services yields an update for both", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{ entry := &structs.ExportedServicesConfigEntry{
Name: "default", Name: "default",
Services: []structs.ExportedService{ Services: []structs.ExportedService{

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil"
) )
// same certificate that appears in our connect tests // same certificate that appears in our connect tests
@ -192,9 +193,8 @@ func (t *incrementalTime) Now() time.Time {
return t.base.Add(time.Duration(t.next) * time.Second) return t.base.Add(time.Duration(t.next) * time.Second)
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
@ -107,7 +108,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
}, },
} }
runStep(t, "empty snapshot returned", func(t *testing.T) { testutil.RunStep(t, "empty snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
@ -117,7 +118,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "blocks for timeout", func(t *testing.T) { testutil.RunStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout // Subsequent fetch should block for the timeout
start := time.Now() start := time.Now()
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
@ -135,7 +136,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
var lastResultValue structs.CheckServiceNodes var lastResultValue structs.CheckServiceNodes
runStep(t, "blocks until update", func(t *testing.T) { testutil.RunStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update // Make another blocking query with a longer timeout and trigger an update
// event part way through. // event part way through.
start := time.Now() start := time.Now()
@ -161,7 +162,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { testutil.RunStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
streamClient.QueueErr(tempError("broken pipe")) streamClient.QueueErr(tempError("broken pipe"))
// Next fetch will continue to block until timeout and receive the same // Next fetch will continue to block until timeout and receive the same
@ -200,7 +201,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) { testutil.RunStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting // Wait and send the error while fetcher is waiting
go func() { go func() {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
@ -285,7 +286,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
streamClient: client, streamClient: client,
} }
runStep(t, "full snapshot returned", func(t *testing.T) { testutil.RunStep(t, "full snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
@ -297,7 +298,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "blocks until deregistration", func(t *testing.T) { testutil.RunStep(t, "blocks until deregistration", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update // Make another blocking query with a longer timeout and trigger an update
// event part way through. // event part way through.
start := time.Now() start := time.Now()
@ -325,7 +326,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "server reload is respected", func(t *testing.T) { testutil.RunStep(t, "server reload is respected", func(t *testing.T) {
// Simulates the server noticing the request's ACL token privs changing. To // Simulates the server noticing the request's ACL token privs changing. To
// detect this we'll queue up the new snapshot as a different set of nodes // detect this we'll queue up the new snapshot as a different set of nodes
// to the first. // to the first.
@ -355,7 +356,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) { testutil.RunStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
client.QueueErr(tempError("temporary connection error")) client.QueueErr(tempError("temporary connection error"))
client.QueueEvents( client.QueueEvents(
@ -430,7 +431,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
streamClient: client, streamClient: client,
} }
runStep(t, "full snapshot returned", func(t *testing.T) { testutil.RunStep(t, "full snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
@ -442,7 +443,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "batched updates work too", func(t *testing.T) { testutil.RunStep(t, "batched updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same // Simulate multiple registrations happening in one Txn (so all have same
// index) // index)
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
@ -499,7 +500,7 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
batchEv, batchEv,
newEndOfSnapshotEvent(5)) newEndOfSnapshotEvent(5))
runStep(t, "filtered snapshot returned", func(t *testing.T) { testutil.RunStep(t, "filtered snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
@ -511,7 +512,7 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "filtered updates work too", func(t *testing.T) { testutil.RunStep(t, "filtered updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (all have same index) // Simulate multiple registrations happening in one Txn (all have same index)
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
// Deregister an existing node // Deregister an existing node
@ -666,11 +667,10 @@ func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) er
} }
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }
func TestNewFilterEvaluator(t *testing.T) { func TestNewFilterEvaluator(t *testing.T) {

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
) )
@ -32,7 +33,7 @@ func TestStore_Get(t *testing.T) {
newEventServiceHealthRegister(10, 1, "srv1"), newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(22, 2, "srv1")) newEventServiceHealthRegister(22, 2, "srv1"))
runStep(t, "from empty store, starts materializer", func(t *testing.T) { testutil.RunStep(t, "from empty store, starts materializer", func(t *testing.T) {
var result Result var result Result
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
var err error var err error
@ -56,7 +57,7 @@ func TestStore_Get(t *testing.T) {
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
}) })
runStep(t, "with an index that already exists in the view", func(t *testing.T) { testutil.RunStep(t, "with an index that already exists in the view", func(t *testing.T) {
req.index = 21 req.index = 21
result, err := store.Get(ctx, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
@ -84,7 +85,7 @@ func TestStore_Get(t *testing.T) {
chResult <- resultOrError{Result: result, Err: err} chResult <- resultOrError{Result: result, Err: err}
}() }()
runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { testutil.RunStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) {
select { select {
case <-chResult: case <-chResult:
t.Fatalf("expected Get to block") t.Fatalf("expected Get to block")
@ -97,7 +98,7 @@ func TestStore_Get(t *testing.T) {
require.Equal(t, 1, e.requests) require.Equal(t, 1, e.requests)
}) })
runStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) { testutil.RunStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1"))
select { select {
@ -112,7 +113,7 @@ func TestStore_Get(t *testing.T) {
require.Equal(t, 1, e.requests) require.Equal(t, 1, e.requests)
}) })
runStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) { testutil.RunStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1")) req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1"))
var getResult resultOrError var getResult resultOrError
select { select {
@ -139,7 +140,7 @@ func TestStore_Get(t *testing.T) {
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
}) })
runStep(t, "with no index returns latest value", func(t *testing.T) { testutil.RunStep(t, "with no index returns latest value", func(t *testing.T) {
req.index = 0 req.index = 0
result, err := store.Get(ctx, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
@ -160,7 +161,7 @@ func TestStore_Get(t *testing.T) {
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
}) })
runStep(t, "blocks until timeout", func(t *testing.T) { testutil.RunStep(t, "blocks until timeout", func(t *testing.T) {
req.index = 50 req.index = 50
req.timeout = 25 * time.Millisecond req.timeout = 25 * time.Millisecond
@ -304,7 +305,7 @@ func TestStore_Notify(t *testing.T) {
err := store.Notify(ctx, req, cID, ch) err := store.Notify(ctx, req, cID, ch)
require.NoError(t, err) require.NoError(t, err)
runStep(t, "from empty store, starts materializer", func(t *testing.T) { testutil.RunStep(t, "from empty store, starts materializer", func(t *testing.T) {
store.lock.Lock() store.lock.Lock()
defer store.lock.Unlock() defer store.lock.Unlock()
require.Len(t, store.byKey, 1) require.Len(t, store.byKey, 1)
@ -313,7 +314,7 @@ func TestStore_Notify(t *testing.T) {
require.Equal(t, 1, e.requests) require.Equal(t, 1, e.requests)
}) })
runStep(t, "updates are received", func(t *testing.T) { testutil.RunStep(t, "updates are received", func(t *testing.T) {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
select { select {
case update := <-ch: case update := <-ch:
@ -339,7 +340,7 @@ func TestStore_Notify(t *testing.T) {
} }
}) })
runStep(t, "closing the notify starts the expiry counter", func(t *testing.T) { testutil.RunStep(t, "closing the notify starts the expiry counter", func(t *testing.T) {
cancel() cancel()
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -395,7 +396,7 @@ func TestStore_Notify_ManyRequests(t *testing.T) {
var req2 *fakeRPCRequest var req2 *fakeRPCRequest
runStep(t, "Get and Notify with a different key", func(t *testing.T) { testutil.RunStep(t, "Get and Notify with a different key", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -414,7 +415,7 @@ func TestStore_Notify_ManyRequests(t *testing.T) {
}) })
}) })
runStep(t, "end all the requests", func(t *testing.T) { testutil.RunStep(t, "end all the requests", func(t *testing.T) {
req.client.QueueEvents( req.client.QueueEvents(
newEventServiceHealthRegister(10, 1, "srv1"), newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(12, 2, "srv1"), newEventServiceHealthRegister(12, 2, "srv1"),
@ -433,7 +434,7 @@ func TestStore_Notify_ManyRequests(t *testing.T) {
}) })
}) })
runStep(t, "the expiry heap should contain two entries", func(t *testing.T) { testutil.RunStep(t, "the expiry heap should contain two entries", func(t *testing.T) {
store.lock.Lock() store.lock.Lock()
defer store.lock.Unlock() defer store.lock.Unlock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
@ -505,9 +506,8 @@ func TestStore_Run_ExpiresEntries(t *testing.T) {
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index()) require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }

View File

@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/sdk/testutil"
) )
// NOTE: For these tests, prefer not using xDS protobuf "factory" methods if // NOTE: For these tests, prefer not using xDS protobuf "factory" methods if
@ -45,7 +46,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
var snap *proxycfg.ConfigSnapshot var snap *proxycfg.ConfigSnapshot
runStep(t, "initial setup", func(t *testing.T) { testutil.RunStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "") snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover. We'll assume we are testing a partial // Send initial cluster discover. We'll assume we are testing a partial
@ -66,7 +67,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)
}) })
runStep(t, "first sync", func(t *testing.T) { testutil.RunStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType, TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1), Nonce: hexString(1),
@ -163,7 +164,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1] snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1]
} }
runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { testutil.RunStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesUnsubscribe: []string{ ResourceNamesUnsubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
@ -181,7 +182,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
}) })
runStep(t, "restore endpoint subscription", func(t *testing.T) { testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) {
// Fix the snapshot // Fix the snapshot
snap = newTestSnapshot(t, snap, "") snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)
@ -209,7 +210,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
}) })
// NOTE: this has to be the last subtest since it kills the stream // NOTE: this has to be the last subtest since it kills the stream
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) { testutil.RunStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail // Force sends to fail
envoy.SetSendErr(errors.New("test error")) envoy.SetSendErr(errors.New("test error"))
@ -247,7 +248,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
var snap *proxycfg.ConfigSnapshot var snap *proxycfg.ConfigSnapshot
runStep(t, "initial setup", func(t *testing.T) { testutil.RunStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "") snap = newTestSnapshot(t, nil, "")
// Plug in a bad port for the public listener // Plug in a bad port for the public listener
@ -265,7 +266,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)
}) })
runStep(t, "first sync", func(t *testing.T) { testutil.RunStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType, TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1), Nonce: hexString(1),
@ -331,7 +332,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
}) })
runStep(t, "simulate envoy NACKing a listener update", func(t *testing.T) { testutil.RunStep(t, "simulate envoy NACKing a listener update", func(t *testing.T) {
// Correct the port and deliver a new snapshot // Correct the port and deliver a new snapshot
snap.Port = 9999 snap.Port = 9999
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)
@ -390,7 +391,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
}) })
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)
runStep(t, "no-rds", func(t *testing.T) { testutil.RunStep(t, "no-rds", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType, TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1), Nonce: hexString(1),
@ -468,7 +469,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
}) })
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)
runStep(t, "with-rds", func(t *testing.T) { testutil.RunStep(t, "with-rds", func(t *testing.T) {
// Just the "db" listener sees a change // Just the "db" listener sees a change
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType, TypeUrl: xdscommon.ListenerType,
@ -546,7 +547,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
mgr.RegisterProxy(t, sid) mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot var snap *proxycfg.ConfigSnapshot
runStep(t, "get into initial state", func(t *testing.T) { testutil.RunStep(t, "get into initial state", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "") snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover. // Send initial cluster discover.
@ -626,7 +627,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
// Disable hack. Need to wait for one more event to wake up the loop. // Disable hack. Need to wait for one more event to wake up the loop.
atomic.StoreUint32(&slowHackDisabled, 1) atomic.StoreUint32(&slowHackDisabled, 1)
runStep(t, "delayed endpoint update finally comes in", func(t *testing.T) { testutil.RunStep(t, "delayed endpoint update finally comes in", func(t *testing.T) {
// Trigger the xds.Server select{} to wake up and notice our hack is disabled. // Trigger the xds.Server select{} to wake up and notice our hack is disabled.
// The actual contents of this change are irrelevant. // The actual contents of this change are irrelevant.
snap = newTestSnapshot(t, snap, "") snap = newTestSnapshot(t, snap, "")
@ -671,7 +672,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
mgr.RegisterProxy(t, sid) mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot var snap *proxycfg.ConfigSnapshot
runStep(t, "get into initial state", func(t *testing.T) { testutil.RunStep(t, "get into initial state", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "") snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover. // Send initial cluster discover.
@ -746,7 +747,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
}) })
runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) { testutil.RunStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) {
// Update the snapshot in a way that causes a single cluster update. // Update the snapshot in a way that causes a single cluster update.
snap = newTestSnapshot(t, snap, "", &structs.ServiceResolverConfigEntry{ snap = newTestSnapshot(t, snap, "", &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver, Kind: structs.ServiceResolver,
@ -808,7 +809,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
var snap *proxycfg.ConfigSnapshot var snap *proxycfg.ConfigSnapshot
runStep(t, "get into initial state", func(t *testing.T) { testutil.RunStep(t, "get into initial state", func(t *testing.T) {
// Send initial cluster discover (empty payload) // Send initial cluster discover (empty payload)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
@ -908,7 +909,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
}) })
runStep(t, "trigger listener update needing implicit route replacements", func(t *testing.T) { testutil.RunStep(t, "trigger listener update needing implicit route replacements", func(t *testing.T) {
// Update the snapshot in a way that causes a single listener update. // Update the snapshot in a way that causes a single listener update.
// //
// Downgrade from http2 to http // Downgrade from http2 to http

View File

@ -795,11 +795,10 @@ func makeTestRoute(t *testing.T, fixtureName string) *envoy_route_v3.RouteConfig
} }
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }
func requireProtocolVersionGauge( func requireProtocolVersionGauge(

View File

@ -3,6 +3,7 @@ package api
import ( import (
"testing" "testing"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -13,7 +14,7 @@ func TestAPI_ConfigEntries_ExportedServices(t *testing.T) {
entries := c.ConfigEntries() entries := c.ConfigEntries()
runStep(t, "set and get", func(t *testing.T) { testutil.RunStep(t, "set and get", func(t *testing.T) {
exports := &ExportedServicesConfigEntry{ exports := &ExportedServicesConfigEntry{
Name: PartitionDefaultName, Name: PartitionDefaultName,
Partition: defaultPartition, Partition: defaultPartition,
@ -41,7 +42,7 @@ func TestAPI_ConfigEntries_ExportedServices(t *testing.T) {
require.Equal(t, exports, result) require.Equal(t, exports, result)
}) })
runStep(t, "update", func(t *testing.T) { testutil.RunStep(t, "update", func(t *testing.T) {
updated := &ExportedServicesConfigEntry{ updated := &ExportedServicesConfigEntry{
Name: PartitionDefaultName, Name: PartitionDefaultName,
Services: []ExportedService{ Services: []ExportedService{
@ -81,7 +82,7 @@ func TestAPI_ConfigEntries_ExportedServices(t *testing.T) {
require.Equal(t, updated, result) require.Equal(t, updated, result)
}) })
runStep(t, "list", func(t *testing.T) { testutil.RunStep(t, "list", func(t *testing.T) {
entries, qm, err := entries.List(ExportedServices, nil) entries, qm, err := entries.List(ExportedServices, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, qm) require.NotNil(t, qm)
@ -89,7 +90,7 @@ func TestAPI_ConfigEntries_ExportedServices(t *testing.T) {
require.Len(t, entries, 1) require.Len(t, entries, 1)
}) })
runStep(t, "delete", func(t *testing.T) { testutil.RunStep(t, "delete", func(t *testing.T) {
wm, err := entries.Delete(ExportedServices, PartitionDefaultName, nil) wm, err := entries.Delete(ExportedServices, PartitionDefaultName, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, wm) require.NotNil(t, wm)

View File

@ -6,6 +6,8 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil"
) )
func TestAPI_ConfigEntries(t *testing.T) { func TestAPI_ConfigEntries(t *testing.T) {
@ -209,7 +211,7 @@ func TestAPI_ConfigEntries(t *testing.T) {
} }
ce := c.ConfigEntries() ce := c.ConfigEntries()
runStep(t, "set and get", func(t *testing.T) { testutil.RunStep(t, "set and get", func(t *testing.T) {
_, wm, err := ce.Set(mesh, nil) _, wm, err := ce.Set(mesh, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, wm) require.NotNil(t, wm)
@ -229,7 +231,7 @@ func TestAPI_ConfigEntries(t *testing.T) {
require.Equal(t, mesh, result) require.Equal(t, mesh, result)
}) })
runStep(t, "list", func(t *testing.T) { testutil.RunStep(t, "list", func(t *testing.T) {
entries, qm, err := ce.List(MeshConfig, nil) entries, qm, err := ce.List(MeshConfig, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, qm) require.NotNil(t, qm)
@ -237,7 +239,7 @@ func TestAPI_ConfigEntries(t *testing.T) {
require.Len(t, entries, 1) require.Len(t, entries, 1)
}) })
runStep(t, "delete", func(t *testing.T) { testutil.RunStep(t, "delete", func(t *testing.T) {
wm, err := ce.Delete(MeshConfig, MeshConfigMesh, nil) wm, err := ce.Delete(MeshConfig, MeshConfigMesh, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, wm) require.NotNil(t, wm)
@ -281,11 +283,10 @@ func TestAPI_ConfigEntries(t *testing.T) {
}) })
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { testutil.RunStep(t, name, fn)
t.FailNow()
}
} }
func TestDecodeConfigEntry(t *testing.T) { func TestDecodeConfigEntry(t *testing.T) {

View File

@ -6,6 +6,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/sdk/testutil"
) )
var _ heap.Interface = (*entryHeap)(nil) var _ heap.Interface = (*entryHeap)(nil)
@ -18,14 +20,14 @@ func TestExpiryHeap(t *testing.T) {
// Init, shouldn't trigger anything // Init, shouldn't trigger anything
testNoMessage(t, ch) testNoMessage(t, ch)
runStep(t, "add an entry", func(t *testing.T) { testutil.RunStep(t, "add an entry", func(t *testing.T) {
entry = h.Add("foo", 100*time.Millisecond) entry = h.Add("foo", 100*time.Millisecond)
assert.Equal(t, 0, entry.heapIndex) assert.Equal(t, 0, entry.heapIndex)
testMessage(t, ch) testMessage(t, ch)
testNoMessage(t, ch) // exactly one asserted above testNoMessage(t, ch) // exactly one asserted above
}) })
runStep(t, "add a second entry in front", func(t *testing.T) { testutil.RunStep(t, "add a second entry in front", func(t *testing.T) {
entry2 = h.Add("bar", 50*time.Millisecond) entry2 = h.Add("bar", 50*time.Millisecond)
assert.Equal(t, 0, entry2.heapIndex) assert.Equal(t, 0, entry2.heapIndex)
assert.Equal(t, 1, entry.heapIndex) assert.Equal(t, 1, entry.heapIndex)
@ -33,13 +35,13 @@ func TestExpiryHeap(t *testing.T) {
testNoMessage(t, ch) // exactly one asserted above testNoMessage(t, ch) // exactly one asserted above
}) })
runStep(t, "add a third entry at the end", func(t *testing.T) { testutil.RunStep(t, "add a third entry at the end", func(t *testing.T) {
entry3 = h.Add("baz", 1000*time.Millisecond) entry3 = h.Add("baz", 1000*time.Millisecond)
assert.Equal(t, 2, entry3.heapIndex) assert.Equal(t, 2, entry3.heapIndex)
testNoMessage(t, ch) // no notify cause index 0 stayed the same testNoMessage(t, ch) // no notify cause index 0 stayed the same
}) })
runStep(t, "remove the first entry", func(t *testing.T) { testutil.RunStep(t, "remove the first entry", func(t *testing.T) {
h.Remove(0) h.Remove(0)
assert.Equal(t, 0, entry.heapIndex) assert.Equal(t, 0, entry.heapIndex)
assert.Equal(t, 1, entry3.heapIndex) assert.Equal(t, 1, entry3.heapIndex)
@ -47,7 +49,7 @@ func TestExpiryHeap(t *testing.T) {
testNoMessage(t, ch) testNoMessage(t, ch)
}) })
runStep(t, "update so that entry3 expires first", func(t *testing.T) { testutil.RunStep(t, "update so that entry3 expires first", func(t *testing.T) {
h.Update(entry.heapIndex, 2000*time.Millisecond) h.Update(entry.heapIndex, 2000*time.Millisecond)
assert.Equal(t, 1, entry.heapIndex) assert.Equal(t, 1, entry.heapIndex)
assert.Equal(t, 0, entry3.heapIndex) assert.Equal(t, 0, entry3.heapIndex)
@ -55,7 +57,7 @@ func TestExpiryHeap(t *testing.T) {
testNoMessage(t, ch) testNoMessage(t, ch)
}) })
runStep(t, "0th element change triggers a notify", func(t *testing.T) { testutil.RunStep(t, "0th element change triggers a notify", func(t *testing.T) {
h.Update(entry3.heapIndex, 1500*time.Millisecond) h.Update(entry3.heapIndex, 1500*time.Millisecond)
assert.Equal(t, 1, entry.heapIndex) // no move assert.Equal(t, 1, entry.heapIndex) // no move
assert.Equal(t, 0, entry3.heapIndex) assert.Equal(t, 0, entry3.heapIndex)
@ -63,7 +65,7 @@ func TestExpiryHeap(t *testing.T) {
testNoMessage(t, ch) // one message testNoMessage(t, ch) // one message
}) })
runStep(t, "update can not decrease expiry time", func(t *testing.T) { testutil.RunStep(t, "update can not decrease expiry time", func(t *testing.T) {
h.Update(entry.heapIndex, 100*time.Millisecond) h.Update(entry.heapIndex, 100*time.Millisecond)
assert.Equal(t, 1, entry.heapIndex) // no move assert.Equal(t, 1, entry.heapIndex) // no move
assert.Equal(t, 0, entry3.heapIndex) assert.Equal(t, 0, entry3.heapIndex)
@ -91,8 +93,8 @@ func testMessage(t *testing.T, ch <-chan struct{}) {
} }
} }
// TODO: remove this function after all usages have been switched over
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
if !t.Run(name, fn) { t.Helper()
t.FailNow() testutil.RunStep(t, name, fn)
}
} }

View File

@ -17,3 +17,12 @@ func RequireErrorContains(t testing.TB, err error, expectedErrorMessage string)
t.Fatalf("expected err %v to contain %q", err, expectedErrorMessage) t.Fatalf("expected err %v to contain %q", err, expectedErrorMessage)
} }
} }
// RunStep is a test helper to help you stop a series of subtests from
// executing after the first one that fails.
func RunStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}