diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 8aeb0fa45c..fc637f9d40 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1627,12 +1627,14 @@ func TestStateStore_Services(t *testing.T) { func TestStateStore_ServicesByNodeMeta(t *testing.T) { s := testStateStore(t) - // Listing with no results returns nil. ws := memdb.NewWatchSet() - idx, res, err := s.ServicesByNodeMeta(ws, map[string]string{"somekey": "somevalue"}, nil) - if idx != 0 || len(res) != 0 || err != nil { - t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) - } + + t.Run("Listing with no results returns nil", func(t *testing.T) { + idx, res, err := s.ServicesByNodeMeta(ws, map[string]string{"somekey": "somevalue"}, nil) + if idx != 0 || len(res) != 0 || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) + } + }) // Create some nodes and services in the state store. node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}} @@ -1664,94 +1666,97 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { t.Fatalf("err: %s", err) } if !watchFired(ws) { - t.Fatalf("bad") + t.Fatalf("expected the watch to be triggered by the queries") } - // Filter the services by the first node's meta value. ws = memdb.NewWatchSet() - _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"role": "client"}, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - expected := structs.Services{ - "redis": []string{"master", "prod"}, - } - sort.Strings(res["redis"]) - if !reflect.DeepEqual(res, expected) { - t.Fatalf("bad: %v %v", res, expected) - } - // Get all services using the common meta value - _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"common": "1"}, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - expected = structs.Services{ - "redis": []string{"master", "prod", "slave"}, - } - sort.Strings(res["redis"]) - if !reflect.DeepEqual(res, expected) { - t.Fatalf("bad: %v %v", res, expected) - } + t.Run("Filter the services by the first node's meta value", func(t *testing.T) { + _, res, err := s.ServicesByNodeMeta(ws, map[string]string{"role": "client"}, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + expected := structs.Services{ + "redis": []string{"master", "prod"}, + } + sort.Strings(res["redis"]) + require.Equal(t, expected, res) + }) - // Get an empty list for an invalid meta value - _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"invalid": "nope"}, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - expected = structs.Services{} - if !reflect.DeepEqual(res, expected) { - t.Fatalf("bad: %v %v", res, expected) - } + t.Run("Get all services using the common meta value", func(t *testing.T) { + _, res, err := s.ServicesByNodeMeta(ws, map[string]string{"common": "1"}, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + expected := structs.Services{ + "redis": []string{"master", "prod", "slave"}, + } + sort.Strings(res["redis"]) + require.Equal(t, expected, res) + }) - // Get the first node's service instance using multiple meta filters - _, res, err = s.ServicesByNodeMeta(ws, map[string]string{"role": "client", "common": "1"}, nil) - if err != nil { - t.Fatalf("err: %s", err) - } - expected = structs.Services{ - "redis": []string{"master", "prod"}, - } - sort.Strings(res["redis"]) - if !reflect.DeepEqual(res, expected) { - t.Fatalf("bad: %v %v", res, expected) - } + t.Run("Get an empty list for an invalid meta value", func(t *testing.T) { + _, res, err := s.ServicesByNodeMeta(ws, map[string]string{"invalid": "nope"}, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + expected := structs.Services{} + require.Equal(t, expected, res) + }) - // Sanity check the watch before we proceed. - if watchFired(ws) { - t.Fatalf("bad") - } + t.Run("Get the first node's service instance using multiple meta filters", func(t *testing.T) { + _, res, err := s.ServicesByNodeMeta(ws, map[string]string{"role": "client", "common": "1"}, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + expected := structs.Services{ + "redis": []string{"master", "prod"}, + } + sort.Strings(res["redis"]) + require.Equal(t, expected, res) + }) - // Registering some unrelated node + service should not fire the watch. - testRegisterNode(t, s, 4, "nope") - testRegisterService(t, s, 5, "nope", "nope") - if watchFired(ws) { - t.Fatalf("bad") - } + t.Run("Registering some unrelated node + service should not fire the watch.", func(t *testing.T) { + testRegisterNode(t, s, 4, "nope") + testRegisterService(t, s, 5, "nope", "nope") + if watchFired(ws) { + t.Fatalf("expected the watch to timeout and not be triggered") + } + }) - // Overwhelm the service tracking. - idx = 6 - for i := 0; i < 2*watchLimit; i++ { - node := fmt.Sprintf("many%d", i) - testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"}) - idx++ - testRegisterService(t, s, idx, node, "nope") - idx++ - } + t.Run("Uses watchLimit to limit the number of watches", func(t *testing.T) { + patchWatchLimit(t, 10) - // Now get a fresh watch, which will be forced to watch the whole - // service table. - ws = memdb.NewWatchSet() - _, _, err = s.ServicesByNodeMeta(ws, map[string]string{"common": "1"}, nil) - if err != nil { - t.Fatalf("err: %s", err) - } + var idx uint64 = 6 + for i := 0; i < watchLimit+2; i++ { + node := fmt.Sprintf("many%d", i) + testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"}) + idx++ + testRegisterService(t, s, idx, node, "nope") + idx++ + } - // Registering some unrelated node + service should not fire the watch. - testRegisterService(t, s, idx, "nope", "more-nope") - if !watchFired(ws) { - t.Fatalf("bad") - } + // Now get a fresh watch, which will be forced to watch the whole + // service table. + ws := memdb.NewWatchSet() + _, _, err := s.ServicesByNodeMeta(ws, map[string]string{"common": "1"}, nil) + require.NoError(t, err) + + testRegisterService(t, s, idx, "nope", "more-nope") + if !watchFired(ws) { + t.Fatalf("expected the watch to timeout and not be triggered") + } + }) +} + +// patchWatchLimit package variable. Not safe for concurrent use. Do not use +// with t.Parallel. +func patchWatchLimit(t *testing.T, limit int) { + oldLimit := watchLimit + watchLimit = limit + t.Cleanup(func() { + watchLimit = oldLimit + }) } func TestStateStore_ServiceNodes(t *testing.T) { @@ -1870,7 +1875,8 @@ func TestStateStore_ServiceNodes(t *testing.T) { // Overwhelm the node tracking. idx = 19 - for i := 0; i < 2*watchLimit; i++ { + patchWatchLimit(t, 10) + for i := 0; i < watchLimit+2; i++ { node := fmt.Sprintf("many%d", i) if err := s.EnsureNode(idx, &structs.Node{Node: node, Address: "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) @@ -2639,7 +2645,8 @@ func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) { } // Overwhelm the node tracking. - for i := 0; i < 2*watchLimit; i++ { + patchWatchLimit(t, 10) + for i := 0; i < watchLimit+2; i++ { node := fmt.Sprintf("many%d", idx) testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"}) idx++ @@ -2819,7 +2826,8 @@ func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) { } // Overwhelm the node tracking. - for i := 0; i < 2*watchLimit; i++ { + patchWatchLimit(t, 10) + for i := 0; i < watchLimit+2; i++ { node := fmt.Sprintf("many%d", idx) testRegisterNodeWithMeta(t, s, idx, node, map[string]string{"common": "1"}) idx++ diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 3692deb9dc..bab348fefa 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -82,7 +82,7 @@ var ( ErrMissingIntentionID = errors.New("Missing Intention ID") ) -const ( +var ( // watchLimit is used as a soft limit to cap how many watches we allow // for a given blocking query. If this is exceeded, then we will use a // higher-level watch that's less fine-grained. Choosing the perfect