[NET-7015] DNS v2 + Catalog v2 int test (#20607)

test(v2dns): Add Catalog v2 integration test

Add a basic integration test covering major functionality tested against
Catalog v2 resources. This complements existing tests that ensure
compatibility between v1 and v2 DNS when testing against Catalog v1
resources.
This commit is contained in:
Michael Zalimeni 2024-02-13 12:40:08 -05:00 committed by GitHub
parent ab3c6cf1e5
commit 2c1addfd64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

515
agent/dns_catalogv2_test.go Normal file
View File

@ -0,0 +1,515 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package agent
import (
"context"
"fmt"
"net"
"testing"
"github.com/miekg/dns"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
)
// Similar to TestDNS_ServiceLookup, but removes config for features unsupported in v2 and
// tests against DNS v2 and Catalog v2 explicitly using a resource API client.
func TestDNS_CatalogV2_Basic(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
var err error
a := NewTestAgent(t, `experiments=["resource-apis"]`) // v2dns is implicit w/ resource-apis
defer a.Shutdown()
testrpc.WaitForRaftLeader(t, a.RPC, "dc1")
client := a.delegate.ResourceServiceClient()
// Smoke test for `consul-server` service.
readResource(t, client, &pbresource.ID{
Name: structs.ConsulServiceName,
Type: pbcatalog.ServiceType,
Tenancy: resource.DefaultNamespacedTenancy(),
}, new(pbcatalog.Service))
// Register a new service.
dbServiceId := &pbresource.ID{
Name: "db",
Type: pbcatalog.ServiceType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
emptyServiceId := &pbresource.ID{
Name: "empty",
Type: pbcatalog.ServiceType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
dbService := &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"db-"},
},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
{
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
}
emptyService := &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{"empty-"},
},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: "tcp",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
{
TargetPort: "admin",
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
}
dbServiceResource := &pbresource.Resource{
Id: dbServiceId,
Data: toAny(t, dbService),
}
emptyServiceResource := &pbresource.Resource{
Id: emptyServiceId,
Data: toAny(t, emptyService),
}
for _, r := range []*pbresource.Resource{dbServiceResource, emptyServiceResource} {
_, err := client.Write(context.Background(), &pbresource.WriteRequest{Resource: r})
if err != nil {
t.Fatalf("failed to create the %s service: %v", r.Id.Name, err)
}
}
// Validate services written.
readResource(t, client, dbServiceId, new(pbcatalog.Service))
readResource(t, client, emptyServiceId, new(pbcatalog.Service))
// Register workloads.
dbWorkloadId1 := &pbresource.ID{
Name: "db-1",
Type: pbcatalog.WorkloadType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
dbWorkloadId2 := &pbresource.ID{
Name: "db-2",
Type: pbcatalog.WorkloadType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
dbWorkloadId3 := &pbresource.ID{
Name: "db-3",
Type: pbcatalog.WorkloadType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
dbWorkloadPorts := map[string]*pbcatalog.WorkloadPort{
"tcp": {
Port: 12345,
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
"admin": {
Port: 23456,
Protocol: pbcatalog.Protocol_PROTOCOL_HTTP,
},
"mesh": {
Port: 20000,
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
}
dbWorkloadFn := func(ip string) *pbcatalog.Workload {
return &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{
Host: ip,
},
},
Identity: "test-identity",
Ports: dbWorkloadPorts,
}
}
dbWorkload1 := dbWorkloadFn("172.16.1.1")
_, err = client.Write(context.Background(), &pbresource.WriteRequest{Resource: &pbresource.Resource{
Id: dbWorkloadId1,
Data: toAny(t, dbWorkload1),
}})
if err != nil {
t.Fatalf("failed to create the %s workload: %v", dbWorkloadId1.Name, err)
}
dbWorkload2 := dbWorkloadFn("172.16.1.2")
_, err = client.Write(context.Background(), &pbresource.WriteRequest{Resource: &pbresource.Resource{
Id: dbWorkloadId2,
Data: toAny(t, dbWorkload2),
}})
if err != nil {
t.Fatalf("failed to create the %s workload: %v", dbWorkloadId2.Name, err)
}
dbWorkload3 := dbWorkloadFn("2001:db8:85a3::8a2e:370:7334") // test IPv6
_, err = client.Write(context.Background(), &pbresource.WriteRequest{Resource: &pbresource.Resource{
Id: dbWorkloadId3,
Data: toAny(t, dbWorkload3),
}})
if err != nil {
t.Fatalf("failed to create the %s workload: %v", dbWorkloadId2.Name, err)
}
// Validate workloads written.
dbWorkloads := make(map[string]*pbcatalog.Workload)
dbWorkloads["db-1"] = readResource(t, client, dbWorkloadId1, new(pbcatalog.Workload)).(*pbcatalog.Workload)
dbWorkloads["db-2"] = readResource(t, client, dbWorkloadId2, new(pbcatalog.Workload)).(*pbcatalog.Workload)
dbWorkloads["db-3"] = readResource(t, client, dbWorkloadId3, new(pbcatalog.Workload)).(*pbcatalog.Workload)
// Ensure endpoints exist and have health status, which is required for inclusion in DNS results.
retry.Run(t, func(r *retry.R) {
endpoints := readResource(r, client, resource.ReplaceType(pbcatalog.ServiceEndpointsType, dbServiceId), new(pbcatalog.ServiceEndpoints)).(*pbcatalog.ServiceEndpoints)
require.Equal(r, 3, len(endpoints.GetEndpoints()))
for _, e := range endpoints.GetEndpoints() {
require.True(r,
// We only return results for passing and warning health checks.
e.HealthStatus == pbcatalog.Health_HEALTH_PASSING || e.HealthStatus == pbcatalog.Health_HEALTH_WARNING,
fmt.Sprintf("unexpected health status: %v", e.HealthStatus))
}
})
// Test UDP and TCP clients.
for _, client := range []*dns.Client{
newDNSClient(false),
newDNSClient(true),
} {
// Lookup a service without matching workloads, we should receive an SOA and no answers.
questions := []string{
"empty.service.consul.",
"_empty._tcp.service.consul.",
}
for _, question := range questions {
for _, dnsType := range []uint16{dns.TypeSRV, dns.TypeA, dns.TypeAAAA} {
m := new(dns.Msg)
m.SetQuestion(question, dnsType)
in, _, err := client.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, 0, len(in.Answer), "Bad: %s", in.String())
require.Equal(t, 0, len(in.Extra), "Bad: %s", in.String())
require.Equal(t, 1, len(in.Ns), "Bad: %s", in.String())
soaRec, ok := in.Ns[0].(*dns.SOA)
require.True(t, ok, "Bad: %s", in.Ns[0].String())
require.EqualValues(t, 0, soaRec.Hdr.Ttl, "Bad: %s", in.Ns[0].String())
}
}
// Look up the service directly including all ports.
questions = []string{
"db.service.consul.",
"_db._tcp.service.consul.", // RFC 2782 query. All ports are TCP, so this should return the same result.
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)
in, _, err := client.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
// This check only runs for a TCP client because a UDP client will truncate the response.
if client.Net == "tcp" {
for portName, port := range dbWorkloadPorts {
for workloadName, workload := range dbWorkloads {
workloadTarget := fmt.Sprintf("%s.port.%s.workload.consul.", portName, workloadName)
workloadHost := workload.Addresses[0].Host
srvRec := findSrvAnswerForTarget(t, in, workloadTarget)
require.EqualValues(t, port.Port, srvRec.Port, "Bad: %s", srvRec.String())
require.EqualValues(t, 0, srvRec.Hdr.Ttl, "Bad: %s", srvRec.String())
a := findAorAAAAForName(t, in, in.Extra, workloadTarget)
require.Equal(t, workloadHost, a.AorAAAA.String(), "Bad: %s", a.Original.String())
require.EqualValues(t, 0, a.Hdr.Ttl, "Bad: %s", a.Original.String())
}
}
// Expect 1 result per port, per workload.
require.Equal(t, 9, len(in.Answer), "answer count did not match expected\n\n%s", in.String())
require.Equal(t, 9, len(in.Extra), "extra answer count did not match expected\n\n%s", in.String())
} else {
// Expect 1 result per port, per workload, up to the default limit of 3.
require.Equal(t, 3, len(in.Answer), "answer count did not match expected\n\n%s", in.String())
require.Equal(t, 3, len(in.Extra), "extra answer count did not match expected\n\n%s", in.String())
}
}
// Look up the service directly by each port.
for portName, port := range dbWorkloadPorts {
question := fmt.Sprintf("%s.port.db.service.consul.", portName)
for workloadName, workload := range dbWorkloads {
workloadTarget := fmt.Sprintf("%s.port.%s.workload.consul.", portName, workloadName)
workloadHost := workload.Addresses[0].Host
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)
in, _, err := client.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
srvRec := findSrvAnswerForTarget(t, in, workloadTarget)
require.EqualValues(t, port.Port, srvRec.Port, "Bad: %s", srvRec.String())
require.EqualValues(t, 0, srvRec.Hdr.Ttl, "Bad: %s", srvRec.String())
a := findAorAAAAForName(t, in, in.Extra, workloadTarget)
require.Equal(t, workloadHost, a.AorAAAA.String(), "Bad: %s", a.Original.String())
require.EqualValues(t, 0, a.Hdr.Ttl, "Bad: %s", a.Original.String())
// Expect 1 result per port.
require.Equal(t, 3, len(in.Answer), "answer count did not match expected\n\n%s", in.String())
require.Equal(t, 3, len(in.Extra), "extra answer count did not match expected\n\n%s", in.String())
}
}
// Look up A/AAAA by service.
questions = []string{
"db.service.consul.",
}
for _, question := range questions {
for workloadName, dnsType := range map[string]uint16{
"db-1": dns.TypeA,
"db-2": dns.TypeA,
"db-3": dns.TypeAAAA,
} {
workload := dbWorkloads[workloadName]
m := new(dns.Msg)
m.SetQuestion(question, dnsType)
in, _, err := client.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
workloadHost := workload.Addresses[0].Host
a := findAorAAAAForAddress(t, in, in.Answer, workloadHost)
require.Equal(t, question, a.Hdr.Name, "Bad: %s", a.Original.String())
require.EqualValues(t, 0, a.Hdr.Ttl, "Bad: %s", a.Original.String())
// Expect 1 answer per workload. For A records, expect 2 answers because there's 2 IPv4 workloads.
if dnsType == dns.TypeA {
require.Equal(t, 2, len(in.Answer), "answer count did not match expected\n\n%s", in.String())
} else {
require.Equal(t, 1, len(in.Answer), "answer count did not match expected\n\n%s", in.String())
}
require.Equal(t, 0, len(in.Extra), "extra answer count did not match expected\n\n%s", in.String())
}
}
// Lookup a non-existing service, we should receive an SOA.
questions = []string{
"nodb.service.consul.",
"nope.query.consul.", // prepared query is not supported in v2
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)
in, _, err := client.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, 1, len(in.Ns), "Bad: %s", in.String())
soaRec, ok := in.Ns[0].(*dns.SOA)
require.True(t, ok, "Bad: %s", in.Ns[0].String())
require.EqualValues(t, 0, soaRec.Hdr.Ttl, "Bad: %s", in.Ns[0].String())
}
// Lookup workloads directly with a port.
for workloadName, dnsType := range map[string]uint16{
"db-1": dns.TypeA,
"db-2": dns.TypeA,
"db-3": dns.TypeAAAA,
} {
for _, question := range []string{
fmt.Sprintf("%s.workload.consul.", workloadName),
fmt.Sprintf("tcp.port.%s.workload.consul.", workloadName),
fmt.Sprintf("admin.port.%s.workload.consul.", workloadName),
fmt.Sprintf("mesh.port.%s.workload.consul.", workloadName),
} {
workload := dbWorkloads[workloadName]
workloadHost := workload.Addresses[0].Host
m := new(dns.Msg)
m.SetQuestion(question, dnsType)
in, _, err := client.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, 1, len(in.Answer), "Bad: %s", in.String())
a := findAorAAAAForName(t, in, in.Answer, question)
require.Equal(t, workloadHost, a.AorAAAA.String(), "Bad: %s", a.Original.String())
require.EqualValues(t, 0, a.Hdr.Ttl, "Bad: %s", a.Original.String())
}
}
// Lookup a non-existing workload, we should receive an NXDOMAIN response.
for _, aType := range []uint16{dns.TypeA, dns.TypeAAAA} {
question := "unknown.workload.consul."
m := new(dns.Msg)
m.SetQuestion(question, aType)
in, _, err := client.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, 0, len(in.Answer), "Bad: %s", in.String())
require.Equal(t, dns.RcodeNameError, in.Rcode, "Bad: %s", in.String())
}
}
}
func findSrvAnswerForTarget(t *testing.T, in *dns.Msg, target string) *dns.SRV {
t.Helper()
for _, a := range in.Answer {
srvRec, ok := a.(*dns.SRV)
if ok && srvRec.Target == target {
return srvRec
}
}
t.Fatalf("could not find SRV record for target: %s\n\n%s", target, in.String())
return nil
}
func findAorAAAAForName(t *testing.T, in *dns.Msg, rrs []dns.RR, name string) *dnsAOrAAAA {
t.Helper()
for _, rr := range rrs {
a := newAOrAAAA(t, rr)
if a.Hdr.Name == name {
return a
}
}
t.Fatalf("could not find A/AAAA record for name: %s\n\n%+v", name, in.String())
return nil
}
func findAorAAAAForAddress(t *testing.T, in *dns.Msg, rrs []dns.RR, address string) *dnsAOrAAAA {
t.Helper()
for _, rr := range rrs {
a := newAOrAAAA(t, rr)
if a.AorAAAA.String() == address {
return a
}
}
t.Fatalf("could not find A/AAAA record for address: %s\n\n%+v", address, in.String())
return nil
}
func readResource(t retry.TestingTB, client pbresource.ResourceServiceClient, id *pbresource.ID, m proto.Message) proto.Message {
t.Helper()
retry.Run(t, func(r *retry.R) {
res, err := client.Read(context.Background(), &pbresource.ReadRequest{Id: id})
if err != nil {
r.Fatalf("err: %v", err)
}
data := res.GetResource()
require.NotEmpty(r, data)
err = data.Data.UnmarshalTo(m)
require.NoError(r, err)
})
return m
}
func toAny(t retry.TestingTB, m proto.Message) *anypb.Any {
t.Helper()
a, err := anypb.New(m)
if err != nil {
t.Fatalf("could not convert proto to `any` message: %v", err)
}
return a
}
// dnsAOrAAAA unifies A and AAAA records for simpler testing when the IP type doesn't matter.
type dnsAOrAAAA struct {
Original dns.RR
Hdr dns.RR_Header
AorAAAA net.IP
isAAAA bool
}
func newAOrAAAA(t *testing.T, rr dns.RR) *dnsAOrAAAA {
t.Helper()
if aRec, ok := rr.(*dns.A); ok {
return &dnsAOrAAAA{
Original: rr,
Hdr: aRec.Hdr,
AorAAAA: aRec.A,
isAAAA: false,
}
}
if aRec, ok := rr.(*dns.AAAA); ok {
return &dnsAOrAAAA{
Original: rr,
Hdr: aRec.Hdr,
AorAAAA: aRec.AAAA,
isAAAA: true,
}
}
t.Fatalf("Bad A or AAAA record: %#v", rr)
return nil
}
func newDNSClient(tcp bool) *dns.Client {
c := new(dns.Client)
// Use TCP to avoid truncation of larger responses and
// sidestep the default UDP size limit of 3 answers
// set by config.DefaultSource() in agent/config/default.go.
if tcp {
c.Net = "tcp"
}
return c
}