From fcc43a9a36fcea161a05461aa998854024f09697 Mon Sep 17 00:00:00 2001 From: Dan Stough Date: Tue, 6 Feb 2024 11:12:04 -0500 Subject: [PATCH] feat(v2dns): catalog v2 SOA and NS support (#20480) --- agent/discovery/discovery.go | 2 +- agent/discovery/query_fetcher_v2.go | 189 +++++++-- agent/discovery/query_fetcher_v2_test.go | 492 ++++++++++++++++++++++- agent/dns/router.go | 6 +- agent/dns/router_ce_test.go | 4 +- agent/dns/router_test.go | 202 +++++++++- 6 files changed, 844 insertions(+), 51 deletions(-) diff --git a/agent/discovery/discovery.go b/agent/discovery/discovery.go index c56a65f7a1..4468eb892e 100644 --- a/agent/discovery/discovery.go +++ b/agent/discovery/discovery.go @@ -43,7 +43,6 @@ func (e ECSNotGlobalError) Unwrap() error { type Query struct { QueryType QueryType QueryPayload QueryPayload - Limit int } // QueryType is used to filter service endpoints. @@ -84,6 +83,7 @@ type QueryPayload struct { Tag string // deprecated: use for V1 only SourceIP net.IP // deprecated: used for prepared queries Tenancy QueryTenancy // tenancy includes any additional labels specified before the domain + Limit int // The maximum number of records to return // v2 fields only EnableFailover bool diff --git a/agent/discovery/query_fetcher_v2.go b/agent/discovery/query_fetcher_v2.go index 0fd4dd9c74..0033c84dff 100644 --- a/agent/discovery/query_fetcher_v2.go +++ b/agent/discovery/query_fetcher_v2.go @@ -6,6 +6,7 @@ package discovery import ( "context" "fmt" + "math/rand" "net" "strings" "sync/atomic" @@ -13,6 +14,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/hashicorp/go-hclog" @@ -63,9 +65,62 @@ func (f *V2DataFetcher) FetchNodes(ctx Context, req *QueryPayload) ([]*Result, e } // FetchEndpoints fetches records for A/AAAA/CNAME or SRV requests for services -// TODO (v2-dns): Validate lookupType -func (f *V2DataFetcher) FetchEndpoints(ctx Context, req *QueryPayload, lookupType LookupType) ([]*Result, error) { - return nil, nil +func (f *V2DataFetcher) FetchEndpoints(reqContext Context, req *QueryPayload, lookupType LookupType) ([]*Result, error) { + if lookupType != LookupTypeService { + return nil, ErrNotSupported + } + + configCtx := f.dynamicConfig.Load().(*v2DataFetcherDynamicConfig) + + serviceEndpoints := pbcatalog.ServiceEndpoints{} + resourceObj, err := f.fetchResource(reqContext, *req, pbcatalog.ServiceEndpointsType, &serviceEndpoints) + if err != nil { + return nil, err + } + + // Shuffle the endpoints slice + shuffleFunc := func(i, j int) { + serviceEndpoints.Endpoints[i], serviceEndpoints.Endpoints[j] = serviceEndpoints.Endpoints[j], serviceEndpoints.Endpoints[i] + } + rand.Shuffle(len(serviceEndpoints.Endpoints), shuffleFunc) + + // Convert the service endpoints to results up to the limit + limit := req.Limit + if len(serviceEndpoints.Endpoints) < limit || limit == 0 { + limit = len(serviceEndpoints.Endpoints) + } + + results := make([]*Result, 0, limit) + for idx := 0; idx < limit; idx++ { + endpoint := serviceEndpoints.Endpoints[idx] + + // TODO (v2-dns): filter based on the port name requested + + address, err := f.addressFromWorkloadAddresses(endpoint.Addresses, req.Name) + if err != nil { + return nil, err + } + + weight, ok := getEndpointWeight(endpoint, configCtx) + if !ok { + continue + } + + result := &Result{ + Node: &Location{ + Address: address, + Name: endpoint.GetTargetRef().GetName(), + }, + Type: ResultTypeWorkload, // TODO (v2-dns): I'm not really sure if it's better to have SERVICE OR WORKLOAD here + Tenancy: ResultTenancy{ + Namespace: resourceObj.GetId().GetTenancy().GetNamespace(), + Partition: resourceObj.GetId().GetTenancy().GetPartition(), + }, + Weight: weight, + } + results = append(results, result) + } + return results, nil } // FetchVirtualIP fetches A/AAAA records for virtual IPs @@ -82,51 +137,22 @@ func (f *V2DataFetcher) FetchRecordsByIp(ctx Context, ip net.IP) ([]*Result, err // FetchWorkload is used to fetch a single workload from the V2 catalog. // V2-only. func (f *V2DataFetcher) FetchWorkload(reqContext Context, req *QueryPayload) (*Result, error) { - // Query the resource service for the workload by name and tenancy - resourceReq := pbresource.ReadRequest{ - Id: &pbresource.ID{ - Name: req.Name, - Type: pbcatalog.WorkloadType, - Tenancy: queryTenancyToResourceTenancy(req.Tenancy), - }, + workload := pbcatalog.Workload{} + resourceObj, err := f.fetchResource(reqContext, *req, pbcatalog.WorkloadType, &workload) + if err != nil { + return nil, err } - f.logger.Debug("fetching workload", "name", req.Name) - resourceCtx := metadata.AppendToOutgoingContext(context.Background(), "x-consul-token", reqContext.Token) - - // If the workload is not found, return nil and an error equivalent to NXDOMAIN - response, err := f.client.Read(resourceCtx, &resourceReq) - switch { - case grpcNotFoundErr(err): - f.logger.Debug("workload not found", "name", req.Name) - return nil, ErrNotFound - case err != nil: - f.logger.Error("error fetching workload", "name", req.Name) - return nil, fmt.Errorf("error fetching workload: %w", err) - // default: fallthrough + address, err := f.addressFromWorkloadAddresses(workload.Addresses, req.Name) + if err != nil { + return nil, err } - workload := &pbcatalog.Workload{} - data := response.GetResource().GetData() - if err := data.UnmarshalTo(workload); err != nil { - f.logger.Error("error unmarshalling workload", "name", req.Name) - return nil, fmt.Errorf("error unmarshalling workload: %w", err) - } - - // TODO: (v2-dns): we will need to intelligently return the right workload address based on either the translate - // address setting or the locality of the requester. Workloads must have at least one. - // We also need to make sure that we filter out unix sockets here. - address := workload.Addresses[0].GetHost() - if strings.HasPrefix(address, "unix://") { - f.logger.Error("unix sockets are currently unsupported in workload results", "name", req.Name) - return nil, ErrNotFound - } - - tenancy := response.GetResource().GetId().GetTenancy() + tenancy := resourceObj.GetId().GetTenancy() result := &Result{ Node: &Location{ Address: address, - Name: response.GetResource().GetId().GetName(), + Name: resourceObj.GetId().GetName(), }, Type: ResultTypeWorkload, Tenancy: ResultTenancy{ @@ -177,6 +203,87 @@ func (f *V2DataFetcher) ValidateRequest(_ Context, req *QueryPayload) error { return nil } +// fetchResource is used to read a single resource from the V2 catalog and cast into a concrete type. +func (f *V2DataFetcher) fetchResource(reqContext Context, req QueryPayload, kind *pbresource.Type, payload proto.Message) (*pbresource.Resource, error) { + // Query the resource service for the ServiceEndpoints by name and tenancy + resourceReq := pbresource.ReadRequest{ + Id: &pbresource.ID{ + Name: req.Name, + Type: kind, + Tenancy: queryTenancyToResourceTenancy(req.Tenancy), + }, + } + + f.logger.Debug("fetching "+kind.String(), "name", req.Name) + resourceCtx := metadata.AppendToOutgoingContext(context.Background(), "x-consul-token", reqContext.Token) + + // If the service is not found, return nil and an error equivalent to NXDOMAIN + response, err := f.client.Read(resourceCtx, &resourceReq) + switch { + case grpcNotFoundErr(err): + f.logger.Debug(kind.String()+" not found", "name", req.Name) + return nil, ErrNotFound + case err != nil: + f.logger.Error("error fetching "+kind.String(), "name", req.Name) + return nil, fmt.Errorf("error fetching %s: %w", kind.String(), err) + // default: fallthrough + } + + data := response.GetResource().GetData() + if err := data.UnmarshalTo(payload); err != nil { + f.logger.Error("error unmarshalling "+kind.String(), "name", req.Name) + return nil, fmt.Errorf("error unmarshalling %s: %w", kind.String(), err) + } + return response.GetResource(), nil +} + +// addressFromWorkloadAddresses returns one address from the workload addresses. +func (f *V2DataFetcher) addressFromWorkloadAddresses(addresses []*pbcatalog.WorkloadAddress, name string) (string, error) { + // TODO: (v2-dns): we will need to intelligently return the right workload address based on either the translate + // address setting or the locality of the requester. Workloads must have at least one. + // We also need to make sure that we filter out unix sockets here. + address := addresses[0].GetHost() + if strings.HasPrefix(address, "unix://") { + f.logger.Error("unix sockets are currently unsupported in workload results", "name", name) + return "", ErrNotFound + } + return address, nil +} + +// getEndpointWeight returns the weight of the endpoint and a boolean indicating if the endpoint should be included +// based on it's health status. +func getEndpointWeight(endpoint *pbcatalog.Endpoint, configCtx *v2DataFetcherDynamicConfig) (uint32, bool) { + health := endpoint.GetHealthStatus().Enum() + if health == nil { + return 0, false + } + + // Filter based on health status and agent config + // This is also a good opportunity to see if SRV weights are set + var weight uint32 + switch *health { + case pbcatalog.Health_HEALTH_PASSING: + weight = endpoint.GetDns().GetWeights().GetPassing() + case pbcatalog.Health_HEALTH_CRITICAL: + return 0, false // always filtered out + case pbcatalog.Health_HEALTH_WARNING: + if configCtx.onlyPassing { + return 0, false // filtered out + } + weight = endpoint.GetDns().GetWeights().GetWarning() + default: + // Everything else can be filtered out + return 0, false + } + + // Important! double-check the weight in the case DNS weights are not set + if weight == 0 { + weight = 1 + } + return weight, true +} + +// queryTenancyToResourceTenancy converts a QueryTenancy to a pbresource.Tenancy. func queryTenancyToResourceTenancy(qTenancy QueryTenancy) *pbresource.Tenancy { rTenancy := resource.DefaultNamespacedTenancy() diff --git a/agent/discovery/query_fetcher_v2_test.go b/agent/discovery/query_fetcher_v2_test.go index f93e3d5f48..9ace561f13 100644 --- a/agent/discovery/query_fetcher_v2_test.go +++ b/agent/discovery/query_fetcher_v2_test.go @@ -21,6 +21,10 @@ import ( "github.com/hashicorp/consul/sdk/testutil" ) +var ( + unknownErr = errors.New("I don't feel so good") +) + // Test_FetchService tests the FetchService method in scenarios where the RPC // call succeeds and fails. func Test_FetchWorkload(t *testing.T) { @@ -29,8 +33,6 @@ func Test_FetchWorkload(t *testing.T) { DNSOnlyPassing: false, } - unknownErr := errors.New("I don't feel so good") - tests := []struct { name string queryPayload *QueryPayload @@ -215,6 +217,436 @@ func Test_FetchWorkload(t *testing.T) { } } +// Test_V2FetchEndpoints the FetchService method in scenarios where the RPC +// call succeeds and fails. +func Test_V2FetchEndpoints(t *testing.T) { + + tests := []struct { + name string + queryPayload *QueryPayload + context Context + configureMockClient func(mockClient *mockpbresource.ResourceServiceClient_Expecter) + rc *config.RuntimeConfig + expectedResult []*Result + expectedErr error + verifyShuffle bool + }{ + { + name: "FetchEndpoints returns result", + queryPayload: &QueryPayload{ + Name: "consul", + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + results := []*pbcatalog.Endpoint{ + makeEndpoint("consul-1", "1.2.3.4", pbcatalog.Health_HEALTH_PASSING, 0, 0), + } + + result := getTestEndpointsResponse(t, "", "", results...) + mockClient.Read(mock.Anything, mock.Anything). + Return(result, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + expectedResult: []*Result{ + { + Node: &Location{Name: "consul-1", Address: "1.2.3.4"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + }, + }, + { + name: "FetchEndpoints returns empty result with no endpoints", + queryPayload: &QueryPayload{ + Name: "consul", + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + + result := getTestEndpointsResponse(t, "", "") + mockClient.Read(mock.Anything, mock.Anything). + Return(result, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + expectedResult: []*Result{}, + }, + { + name: "FetchEndpoints returns a name error when the ServiceEndpoint does not exist", + queryPayload: &QueryPayload{ + Name: "consul", + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + + result := getTestEndpointsResponse(t, "", "") + mockClient.Read(mock.Anything, mock.Anything). + Return(nil, status.Error(codes.NotFound, "not found")). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + expectedErr: ErrNotFound, + }, + { + name: "FetchEndpoints encounters a resource client error", + queryPayload: &QueryPayload{ + Name: "consul", + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + + result := getTestEndpointsResponse(t, "", "") + mockClient.Read(mock.Anything, mock.Anything). + Return(nil, unknownErr). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + expectedErr: unknownErr, + }, + { + name: "FetchEndpoints always filters out critical endpoints; DNS weights applied correctly", + queryPayload: &QueryPayload{ + Name: "consul", + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + results := []*pbcatalog.Endpoint{ + makeEndpoint("consul-1", "1.2.3.4", pbcatalog.Health_HEALTH_PASSING, 2, 3), + makeEndpoint("consul-2", "2.3.4.5", pbcatalog.Health_HEALTH_WARNING, 2, 3), + makeEndpoint("consul-3", "3.4.5.6", pbcatalog.Health_HEALTH_CRITICAL, 2, 3), + } + + result := getTestEndpointsResponse(t, "", "", results...) + mockClient.Read(mock.Anything, mock.Anything). + Return(result, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + expectedResult: []*Result{ + { + Node: &Location{Name: "consul-1", Address: "1.2.3.4"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 2, + }, + { + Node: &Location{Name: "consul-2", Address: "2.3.4.5"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 3, + }, + }, + }, + { + name: "FetchEndpoints filters out warning endpoints when DNSOnlyPassing is true", + queryPayload: &QueryPayload{ + Name: "consul", + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + results := []*pbcatalog.Endpoint{ + makeEndpoint("consul-1", "1.2.3.4", pbcatalog.Health_HEALTH_PASSING, 2, 3), + makeEndpoint("consul-2", "2.3.4.5", pbcatalog.Health_HEALTH_WARNING, 2, 3), + makeEndpoint("consul-3", "3.4.5.6", pbcatalog.Health_HEALTH_CRITICAL, 2, 3), + } + + result := getTestEndpointsResponse(t, "", "", results...) + mockClient.Read(mock.Anything, mock.Anything). + Return(result, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + rc: &config.RuntimeConfig{ + DNSOnlyPassing: true, + }, + expectedResult: []*Result{ + { + Node: &Location{Name: "consul-1", Address: "1.2.3.4"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 2, + }, + }, + }, + { + name: "FetchEndpoints shuffles the results", + queryPayload: &QueryPayload{ + Name: "consul", + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + results := []*pbcatalog.Endpoint{ + // use a set of 10 elements, the odds of getting the same result are 1 in 3628800 + makeEndpoint("consul-1", "10.0.0.1", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-2", "10.0.0.2", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-3", "10.0.0.3", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-4", "10.0.0.4", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-5", "10.0.0.5", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-6", "10.0.0.6", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-7", "10.0.0.7", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-8", "10.0.0.8", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-9", "10.0.0.9", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-10", "10.0.0.10", pbcatalog.Health_HEALTH_PASSING, 0, 0), + } + + result := getTestEndpointsResponse(t, "", "", results...) + mockClient.Read(mock.Anything, mock.Anything). + Return(result, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + expectedResult: []*Result{ + { + Node: &Location{Name: "consul-1", Address: "10.0.0.1"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-2", Address: "10.0.0.2"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-3", Address: "10.0.0.3"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-4", Address: "10.0.0.4"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-5", Address: "10.0.0.5"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-6", Address: "10.0.0.6"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-7", Address: "10.0.0.7"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-8", Address: "10.0.0.8"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-9", Address: "10.0.0.9"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + { + Node: &Location{Name: "consul-10", Address: "10.0.0.10"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + }, + verifyShuffle: true, + }, + { + name: "FetchEndpoints returns only the specified limit", + queryPayload: &QueryPayload{ + Name: "consul", + Limit: 1, + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + results := []*pbcatalog.Endpoint{ + // intentionally all the same to make this easier to verify + makeEndpoint("consul-1", "10.0.0.1", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-1", "10.0.0.1", pbcatalog.Health_HEALTH_PASSING, 0, 0), + makeEndpoint("consul-1", "10.0.0.1", pbcatalog.Health_HEALTH_PASSING, 0, 0), + } + + result := getTestEndpointsResponse(t, "", "", results...) + mockClient.Read(mock.Anything, mock.Anything). + Return(result, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + }) + }, + expectedResult: []*Result{ + { + Node: &Location{Name: "consul-1", Address: "10.0.0.1"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: resource.DefaultNamespaceName, + Partition: resource.DefaultPartitionName, + }, + Weight: 1, + }, + }, + }, + { + name: "FetchEndpoints returns results with non-default tenancy", + queryPayload: &QueryPayload{ + Name: "consul", + Tenancy: QueryTenancy{ + Namespace: "test-namespace", + Partition: "test-partition", + }, + }, + context: Context{ + Token: "test-token", + }, + configureMockClient: func(mockClient *mockpbresource.ResourceServiceClient_Expecter) { + results := []*pbcatalog.Endpoint{ + // intentionally all the same to make this easier to verify + makeEndpoint("consul-1", "10.0.0.1", pbcatalog.Health_HEALTH_PASSING, 0, 0), + } + + result := getTestEndpointsResponse(t, "test-namespace", "test-partition", results...) + mockClient.Read(mock.Anything, mock.Anything). + Return(result, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.Equal(t, result.GetResource().GetId().GetName(), req.Id.Name) + require.Equal(t, result.GetResource().GetId().GetTenancy().GetNamespace(), req.Id.Tenancy.Namespace) + require.Equal(t, result.GetResource().GetId().GetTenancy().GetPartition(), req.Id.Tenancy.Partition) + }) + }, + expectedResult: []*Result{ + { + Node: &Location{Name: "consul-1", Address: "10.0.0.1"}, + Type: ResultTypeWorkload, + Tenancy: ResultTenancy{ + Namespace: "test-namespace", + Partition: "test-partition", + }, + Weight: 1, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger := testutil.Logger(t) + + client := mockpbresource.NewResourceServiceClient(t) + mockClient := client.EXPECT() + tc.configureMockClient(mockClient) + + if tc.rc == nil { + tc.rc = &config.RuntimeConfig{ + DNSOnlyPassing: false, + } + } + + df := NewV2DataFetcher(tc.rc, client, logger) + + result, err := df.FetchEndpoints(tc.context, tc.queryPayload, LookupTypeService) + require.True(t, errors.Is(err, tc.expectedErr)) + + if tc.verifyShuffle { + require.NotEqualf(t, tc.expectedResult, result, "expected result to be shuffled. There is a small probability that it shuffled back to the original order. In that case, you may want to play the lottery.") + } + + require.ElementsMatchf(t, tc.expectedResult, result, "elements of results should match") + }) + } +} + func getTestWorkloadResponse(t *testing.T, nsOverride string, partitionOverride string) *pbresource.ReadResponse { workload := &pbcatalog.Workload{ Addresses: []*pbcatalog.WorkloadAddress{ @@ -239,7 +671,61 @@ func getTestWorkloadResponse(t *testing.T, nsOverride string, partitionOverride Id: &pbresource.ID{ Name: "foo-1234", Type: pbcatalog.WorkloadType, - Tenancy: resource.DefaultNamespacedTenancy(), // TODO (v2-dns): tenancy + Tenancy: resource.DefaultNamespacedTenancy(), + }, + Data: data, + }, + } + + if nsOverride != "" { + resp.Resource.Id.Tenancy.Namespace = nsOverride + } + if partitionOverride != "" { + resp.Resource.Id.Tenancy.Partition = partitionOverride + } + + return resp +} + +func makeEndpoint(name string, address string, health pbcatalog.Health, weightPassing, weightWarning uint32) *pbcatalog.Endpoint { + endpoint := &pbcatalog.Endpoint{ + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: address, + }, + }, + HealthStatus: health, + TargetRef: &pbresource.ID{ + Name: name, + }, + } + + if weightPassing > 0 || weightWarning > 0 { + endpoint.Dns = &pbcatalog.DNSPolicy{ + Weights: &pbcatalog.Weights{ + Passing: weightPassing, + Warning: weightWarning, + }, + } + } + + return endpoint +} + +func getTestEndpointsResponse(t *testing.T, nsOverride string, partitionOverride string, endpoints ...*pbcatalog.Endpoint) *pbresource.ReadResponse { + serviceEndpoints := &pbcatalog.ServiceEndpoints{ + Endpoints: endpoints, + } + + data, err := anypb.New(serviceEndpoints) + require.NoError(t, err) + + resp := &pbresource.ReadResponse{ + Resource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "consul", + Type: pbcatalog.ServiceType, + Tenancy: resource.DefaultNamespacedTenancy(), }, Data: data, }, diff --git a/agent/dns/router.go b/agent/dns/router.go index 965d286f10..405af9ed2a 100644 --- a/agent/dns/router.go +++ b/agent/dns/router.go @@ -291,8 +291,8 @@ func (r *Router) getQueryResults(req *dns.Msg, reqCtx Context, reqType requestTy // need to add something to disambiguate the empty field. Partition: resource.DefaultPartitionName, }, + Limit: 3, }, - Limit: 3, // TODO (v2-dns): need to thread this through to the backend and make sure we shuffle the results } results, err := r.processor.QueryByName(query, discovery.Context{Token: reqCtx.Token}) @@ -844,7 +844,7 @@ func (r *Router) getAnswerExtraAndNs(result *discovery.Result, req *dns.Msg, req answer = append(answer, ptr) case qType == dns.TypeNS: // TODO (v2-dns): fqdn in V1 has the datacenter included, this would need to be added to discovery.Result - fqdn := canonicalNameForResult(result.Type, serviceAddress.String(), domain, result.Tenancy, result.PortName) + fqdn := canonicalNameForResult(result.Type, result.Node.Name, domain, result.Tenancy, result.PortName) extraRecord := makeIPBasedRecord(fqdn, nodeAddress, ttl) // TODO (v2-dns): this is not sufficient, because recursion and CNAMES are supported answer = append(answer, makeNSRecord(domain, fqdn, ttl)) @@ -852,7 +852,7 @@ func (r *Router) getAnswerExtraAndNs(result *discovery.Result, req *dns.Msg, req case qType == dns.TypeSOA: // TODO (v2-dns): fqdn in V1 has the datacenter included, this would need to be added to discovery.Result // to be returned in the result. - fqdn := canonicalNameForResult(result.Type, serviceAddress.String(), domain, result.Tenancy, result.PortName) + fqdn := canonicalNameForResult(result.Type, result.Node.Name, domain, result.Tenancy, result.PortName) extraRecord := makeIPBasedRecord(fqdn, nodeAddress, ttl) // TODO (v2-dns): this is not sufficient, because recursion and CNAMES are supported ns = append(ns, makeNSRecord(domain, fqdn, ttl)) diff --git a/agent/dns/router_ce_test.go b/agent/dns/router_ce_test.go index 72455249fc..3dd63eeee6 100644 --- a/agent/dns/router_ce_test.go +++ b/agent/dns/router_ce_test.go @@ -37,7 +37,7 @@ func getAdditionalTestCases(t *testing.T) []HandleTestCase { configureDataFetcher: func(fetcher discovery.CatalogDataFetcher) { results := []*discovery.Result{ { - Node: &discovery.Location{Name: "foonode", Address: "1.2.3.4"}, + Node: &discovery.Location{Name: "foo", Address: "1.2.3.4"}, Type: discovery.ResultTypeNode, Service: &discovery.Location{Name: "foo", Address: "foo"}, Tenancy: discovery.ResultTenancy{ @@ -100,7 +100,7 @@ func getAdditionalTestCases(t *testing.T) []HandleTestCase { configureDataFetcher: func(fetcher discovery.CatalogDataFetcher) { results := []*discovery.Result{ { - Node: &discovery.Location{Name: "foonode", Address: "1.2.3.4"}, + Node: &discovery.Location{Name: "foo", Address: "1.2.3.4"}, Service: &discovery.Location{Name: "foo", Address: "foo"}, Type: discovery.ResultTypeService, Tenancy: discovery.ResultTenancy{ diff --git a/agent/dns/router_test.go b/agent/dns/router_test.go index 36c9eb8331..220ae27f38 100644 --- a/agent/dns/router_test.go +++ b/agent/dns/router_test.go @@ -833,6 +833,7 @@ func Test_HandleRequest(t *testing.T) { require.Equal(t, discovery.LookupTypeService, reqType) require.Equal(t, structs.ConsulServiceName, req.Name) + require.Equal(t, 3, req.Limit) }) }, validateAndNormalizeExpected: true, @@ -955,6 +956,7 @@ func Test_HandleRequest(t *testing.T) { require.Equal(t, discovery.LookupTypeService, reqType) require.Equal(t, structs.ConsulServiceName, req.Name) + require.Equal(t, 3, req.Limit) }) }, validateAndNormalizeExpected: true, @@ -1031,6 +1033,204 @@ func Test_HandleRequest(t *testing.T) { }, }, }, + // NS Queries + { + name: "vanilla NS query", + request: &dns.Msg{ + MsgHdr: dns.MsgHdr{ + Opcode: dns.OpcodeQuery, + }, + Question: []dns.Question{ + { + Name: "consul.", + Qtype: dns.TypeNS, + Qclass: dns.ClassINET, + }, + }, + }, + configureDataFetcher: func(fetcher discovery.CatalogDataFetcher) { + fetcher.(*discovery.MockCatalogDataFetcher). + On("FetchEndpoints", mock.Anything, mock.Anything, mock.Anything). + Return([]*discovery.Result{ + { + Node: &discovery.Location{Name: "server-one", Address: "1.2.3.4"}, + Type: discovery.ResultTypeWorkload, + }, + { + Node: &discovery.Location{Name: "server-two", Address: "4.5.6.7"}, + Type: discovery.ResultTypeWorkload, + }, + }, nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*discovery.QueryPayload) + reqType := args.Get(2).(discovery.LookupType) + + require.Equal(t, discovery.LookupTypeService, reqType) + require.Equal(t, structs.ConsulServiceName, req.Name) + require.Equal(t, 3, req.Limit) + }) + }, + validateAndNormalizeExpected: true, + response: &dns.Msg{ + MsgHdr: dns.MsgHdr{ + Opcode: dns.OpcodeQuery, + Response: true, + Authoritative: true, + }, + Compress: true, + Question: []dns.Question{ + { + Name: "consul.", + Qtype: dns.TypeNS, + Qclass: dns.ClassINET, + }, + }, + Answer: []dns.RR{ + &dns.NS{ + Hdr: dns.RR_Header{ + Name: "consul.", + Rrtype: dns.TypeNS, + Class: dns.ClassINET, + Ttl: 123, + }, + Ns: "server-one.workload.consul.", // TODO (v2-dns): this format needs to be consistent with other workloads + }, + &dns.NS{ + Hdr: dns.RR_Header{ + Name: "consul.", + Rrtype: dns.TypeNS, + Class: dns.ClassINET, + Ttl: 123, + }, + Ns: "server-two.workload.consul.", + }, + }, + Extra: []dns.RR{ + &dns.A{ + Hdr: dns.RR_Header{ + Name: "server-one.workload.consul.", + Rrtype: dns.TypeA, + Class: dns.ClassINET, + Ttl: 123, + }, + A: net.ParseIP("1.2.3.4"), + }, + &dns.A{ + Hdr: dns.RR_Header{ + Name: "server-two.workload.consul.", + Rrtype: dns.TypeA, + Class: dns.ClassINET, + Ttl: 123, + }, + A: net.ParseIP("4.5.6.7"), + }, + }, + }, + }, + { + name: "NS query against alternate domain", + request: &dns.Msg{ + MsgHdr: dns.MsgHdr{ + Opcode: dns.OpcodeQuery, + }, + Question: []dns.Question{ + { + Name: "testdomain.", + Qtype: dns.TypeNS, + Qclass: dns.ClassINET, + }, + }, + }, + agentConfig: &config.RuntimeConfig{ + DNSDomain: "consul", + DNSAltDomain: "testdomain", + DNSNodeTTL: 123 * time.Second, + DNSSOA: config.RuntimeSOAConfig{ + Refresh: 1, + Retry: 2, + Expire: 3, + Minttl: 4, + }, + }, + configureDataFetcher: func(fetcher discovery.CatalogDataFetcher) { + fetcher.(*discovery.MockCatalogDataFetcher). + On("FetchEndpoints", mock.Anything, mock.Anything, mock.Anything). + Return([]*discovery.Result{ + { + Node: &discovery.Location{Name: "server-one", Address: "1.2.3.4"}, + Type: discovery.ResultTypeWorkload, + }, + { + Node: &discovery.Location{Name: "server-two", Address: "4.5.6.7"}, + Type: discovery.ResultTypeWorkload, + }, + }, nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*discovery.QueryPayload) + reqType := args.Get(2).(discovery.LookupType) + + require.Equal(t, discovery.LookupTypeService, reqType) + require.Equal(t, structs.ConsulServiceName, req.Name) + require.Equal(t, 3, req.Limit) + }) + }, + validateAndNormalizeExpected: true, + response: &dns.Msg{ + MsgHdr: dns.MsgHdr{ + Opcode: dns.OpcodeQuery, + Response: true, + Authoritative: true, + }, + Compress: true, + Question: []dns.Question{ + { + Name: "testdomain.", + Qtype: dns.TypeNS, + Qclass: dns.ClassINET, + }, + }, + Answer: []dns.RR{ + &dns.NS{ + Hdr: dns.RR_Header{ + Name: "testdomain.", + Rrtype: dns.TypeNS, + Class: dns.ClassINET, + Ttl: 123, + }, + Ns: "server-one.workload.testdomain.", + }, + &dns.NS{ + Hdr: dns.RR_Header{ + Name: "testdomain.", + Rrtype: dns.TypeNS, + Class: dns.ClassINET, + Ttl: 123, + }, + Ns: "server-two.workload.testdomain.", + }, + }, + Extra: []dns.RR{ + &dns.A{ + Hdr: dns.RR_Header{ + Name: "server-one.workload.testdomain.", + Rrtype: dns.TypeA, + Class: dns.ClassINET, + Ttl: 123, + }, + A: net.ParseIP("1.2.3.4"), + }, + &dns.A{ + Hdr: dns.RR_Header{ + Name: "server-two.workload.testdomain.", + Rrtype: dns.TypeA, + Class: dns.ClassINET, + Ttl: 123, + }, + A: net.ParseIP("4.5.6.7"), + }, + }, + }, + }, // PTR Lookups { name: "PTR lookup for node, query type is ANY", @@ -1579,7 +1779,7 @@ func Test_HandleRequest(t *testing.T) { }, } - //testCases = append(testCases, getAdditionalTestCases(t)...) + testCases = append(testCases, getAdditionalTestCases(t)...) run := func(t *testing.T, tc HandleTestCase) { cdf := discovery.NewMockCatalogDataFetcher(t)