diff --git a/.changelog/9247.txt b/.changelog/9247.txt new file mode 100644 index 0000000000..1021a80b1c --- /dev/null +++ b/.changelog/9247.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: ensure the order of results provided by /health/service/:serviceName is consistent with and without streaming enabled +``` diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index ac1e6e5285..efb1dba871 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sort" "time" "github.com/hashicorp/go-bexpr" @@ -206,6 +207,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) { return true, nil } +// sortCheckServiceNodes sorts the results to match memdb semantics +// Sort results by Node.Node, if 2 instances match, order by Service.ID +// Will allow result to be stable sorted and match queries without cache +func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) { + sort.SliceStable(serviceNodes.Nodes, func(i, j int) bool { + left := serviceNodes.Nodes[i] + right := serviceNodes.Nodes[j] + if left.Node.Node == right.Node.Node { + return left.Service.ID < right.Service.ID + } + return left.Node.Node < right.Node.Node + }) +} + // Result returns the structs.IndexedCheckServiceNodes stored by this view. func (s *healthView) Result(index uint64) (interface{}, error) { result := structs.IndexedCheckServiceNodes{ @@ -217,6 +232,8 @@ func (s *healthView) Result(index uint64) (interface{}, error) { for _, node := range s.state { result.Nodes = append(result.Nodes, node) } + sortCheckServiceNodes(&result) + return &result, nil } diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 768962aa83..b2581380fe 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -3,11 +3,12 @@ package cachetype import ( "errors" "fmt" - "sort" + "strings" "testing" "time" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -16,6 +17,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/types" ) func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { @@ -226,6 +228,56 @@ func getNamespace(ns string) string { return meta.GetNamespace() } +func TestOrderingConsistentWithMemDb(t *testing.T) { + index := uint64(42) + buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode { + newID, err := uuid.GenerateUUID() + require.NoError(t, err) + return structs.CheckServiceNode{ + Node: &structs.Node{ + ID: types.NodeID(strings.ToUpper(newID)), + Node: nodeName, + Address: nodeName, + Datacenter: "dc1", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &structs.NodeService{ + ID: serviceID, + Service: "testService", + Port: 8080, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Checks: []*structs.HealthCheck{}, + } + } + zero := buildTestNode("a-zero-node", "testService:1") + one := buildTestNode("node1", "testService:1") + two := buildTestNode("node1", "testService:2") + three := buildTestNode("node2", "testService") + result := structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{ + three, two, zero, one, + }, + QueryMeta: structs.QueryMeta{ + Index: index, + }, + } + sortCheckServiceNodes(&result) + expected := structs.CheckServiceNodes{zero, one, two, three} + require.Equal(t, expected, result.Nodes) +} + func TestStreamingHealthServices_FullSnapshot(t *testing.T) { namespace := getNamespace("ns2") client := NewTestStreamingClient(namespace) @@ -261,7 +313,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { for _, csn := range r.Nodes { nodes = append(nodes, csn.Node.Node) } - sort.Strings(nodes) + // Result will be sorted alphabetically the same way as memdb return nodes }