From 45151090c15e2a494fe01dea087e8924390f3909 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Fri, 20 Nov 2020 16:23:35 +0100 Subject: [PATCH] [Streaming] Predictable order for results of /health/service/:serviceName to mimic memdb This ensures the result is consitent with/witout streaming Will partially fix #9239 --- .../cache-types/streaming_health_services.go | 19 ++++++ .../streaming_health_services_test.go | 58 ++++++++++++++++++- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index ac1e6e5285..dd6b4bc80f 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "reflect" + "sort" + "strings" "time" "github.com/hashicorp/go-bexpr" @@ -206,6 +208,21 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) { return true, nil } +// cachedHealResultSorter sorts the results to match memdb semantics +// Sort results by Node.Node, if 2 instances match, order by Service.ID +// Will allow result to be stable sorted and match queries without cache +func cachedHealResultSorter(serviceNodes *structs.IndexedCheckServiceNodes) { + sort.SliceStable(serviceNodes.Nodes, func(i, j int) bool { + left := serviceNodes.Nodes[i] + right := serviceNodes.Nodes[j] + res := strings.Compare(left.Node.Node, right.Node.Node) + if res != 0 { + return res < 0 + } + return strings.Compare(left.Service.ID, right.Service.ID) < 0 + }) +} + // Result returns the structs.IndexedCheckServiceNodes stored by this view. func (s *healthView) Result(index uint64) (interface{}, error) { result := structs.IndexedCheckServiceNodes{ @@ -217,6 +234,8 @@ func (s *healthView) Result(index uint64) (interface{}, error) { for _, node := range s.state { result.Nodes = append(result.Nodes, node) } + cachedHealResultSorter(&result) + return &result, nil } diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 768962aa83..570aa8e521 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -3,11 +3,12 @@ package cachetype import ( "errors" "fmt" - "sort" + "strings" "testing" "time" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -16,6 +17,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/types" ) func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { @@ -226,6 +228,58 @@ func getNamespace(ns string) string { return meta.GetNamespace() } +func TestOrderingConsistentWithMemDb(t *testing.T) { + index := uint64(42) + buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode { + newID, err := uuid.GenerateUUID() + require.NoError(t, err) + return structs.CheckServiceNode{ + Node: &structs.Node{ + ID: types.NodeID(strings.ToUpper(newID)), + Node: nodeName, + Address: nodeName, + Datacenter: "dc1", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &structs.NodeService{ + ID: serviceID, + Service: "testService", + Port: 8080, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Checks: []*structs.HealthCheck{}, + } + } + zero := buildTestNode("a-zero-node", "testService:1") + one := buildTestNode("node1", "testService:1") + two := buildTestNode("node1", "testService:2") + three := buildTestNode("node2", "testService") + result := structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{ + three, two, zero, one, + }, + QueryMeta: structs.QueryMeta{ + Index: index, + }, + } + cachedHealResultSorter(&result) + require.Equal(t, result.Nodes[0], zero) + require.Equal(t, result.Nodes[1], one) + require.Equal(t, result.Nodes[2], two) + require.Equal(t, result.Nodes[3], three) +} + func TestStreamingHealthServices_FullSnapshot(t *testing.T) { namespace := getNamespace("ns2") client := NewTestStreamingClient(namespace) @@ -261,7 +315,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { for _, csn := range r.Nodes { nodes = append(nodes, csn.Node.Node) } - sort.Strings(nodes) + // Result will be sorted alphabetically the same way as memdb return nodes }