peering: ensure that merged central configs of peered upstreams for partitioned downstreams work (#17179)

Partitioned downstreams with peered upstreams could not properly merge central config info (i.e. proxy-defaults and service-defaults things like mesh gateway modes) if the upstream had an empty DestinationPartition field in Enterprise.

Due to data flow, if this setup is done using Consul client agents the field is never empty and thus does not experience the bug.

When a service is registered directly to the catalog as is the case for consul-dataplane use this field may be empty and and the internal machinery of the merging function doesn't handle this well.

This PR ensures the internal machinery of that function is referentially self-consistent.
This commit is contained in:
R.B. Boyer 2023-04-28 12:36:08 -05:00 committed by GitHub
parent 1037bf7f69
commit 6b4986907d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 237 additions and 5 deletions

3
.changelog/17179.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
peering: ensure that merged central configs of peered upstreams for partitioned downstreams work
```

View File

@ -161,17 +161,31 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
// remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs). // remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs).
remoteUpstreams := make(map[structs.PeeredServiceName]structs.Upstream) remoteUpstreams := make(map[structs.PeeredServiceName]structs.Upstream)
// If the arguments did not fully normalize tenancy stuff, take care of that now.
entMeta := ns.EnterpriseMeta
entMeta.Normalize()
for _, us := range defaults.UpstreamConfigs { for _, us := range defaults.UpstreamConfigs {
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config) parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err) return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err)
} }
remoteUpstreams[us.Upstream] = structs.Upstream{ // If the defaults did not fully normalize tenancy stuff, take care of
DestinationNamespace: us.Upstream.ServiceName.NamespaceOrDefault(), // that now too.
DestinationPartition: us.Upstream.ServiceName.PartitionOrDefault(), psn := us.Upstream // only normalize the copy
DestinationName: us.Upstream.ServiceName.Name, psn.ServiceName.EnterpriseMeta.Normalize()
DestinationPeer: us.Upstream.Peer,
// Normalize the partition field specially.
if psn.Peer != "" {
psn.ServiceName.OverridePartition(entMeta.PartitionOrDefault())
}
remoteUpstreams[psn] = structs.Upstream{
DestinationNamespace: psn.ServiceName.NamespaceOrDefault(),
DestinationPartition: psn.ServiceName.PartitionOrDefault(),
DestinationName: psn.ServiceName.Name,
DestinationPeer: psn.Peer,
Config: us.Config, Config: us.Config,
MeshGateway: parsed.MeshGateway, MeshGateway: parsed.MeshGateway,
CentrallyConfigured: true, CentrallyConfigured: true,
@ -192,6 +206,12 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
} }
uid := us.DestinationID() uid := us.DestinationID()
// Normalize the partition field specially.
if uid.Peer != "" {
uid.ServiceName.OverridePartition(entMeta.PartitionOrDefault())
}
localUpstreams[uid] = struct{}{} localUpstreams[uid] = struct{}{}
remoteCfg, ok := remoteUpstreams[uid] remoteCfg, ok := remoteUpstreams[uid]
if !ok { if !ok {

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -275,6 +276,214 @@ func Test_MergeServiceConfig_Extensions(t *testing.T) {
} }
} }
func isEnterprise() bool {
return acl.PartitionOrDefault("") == "default"
}
func Test_MergeServiceConfig_peeredCentralDefaultsMerging(t *testing.T) {
partitions := []string{"default"}
if isEnterprise() {
partitions = append(partitions, "part1")
}
const peerName = "my-peer"
newDefaults := func(partition string) *structs.ServiceConfigResponse {
// client agents
return &structs.ServiceConfigResponse{
ProxyConfig: map[string]any{
"protocol": "http",
},
UpstreamConfigs: []structs.OpaqueUpstreamConfig{
{
Upstream: structs.PeeredServiceName{
ServiceName: structs.ServiceName{
Name: "*",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(partition, "*"),
},
},
Config: map[string]any{
"mesh_gateway": map[string]any{
"Mode": "local",
},
"protocol": "http",
},
},
{
Upstream: structs.PeeredServiceName{
ServiceName: structs.ServiceName{
Name: "static-server",
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(partition, "default"),
},
Peer: peerName,
},
Config: map[string]any{
"mesh_gateway": map[string]any{
"Mode": "local",
},
"protocol": "http",
},
},
},
MeshGateway: structs.MeshGatewayConfig{
Mode: "local",
},
}
}
for _, partition := range partitions {
t.Run("partition="+partition, func(t *testing.T) {
t.Run("clients", func(t *testing.T) {
defaults := newDefaults(partition)
service := &structs.NodeService{
Kind: "connect-proxy",
ID: "static-client-sidecar-proxy",
Service: "static-client-sidecar-proxy",
Address: "",
Port: 21000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "static-client",
DestinationServiceID: "static-client",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8080,
Upstreams: []structs.Upstream{
{
DestinationType: "service",
DestinationNamespace: "default",
DestinationPartition: partition,
DestinationPeer: peerName,
DestinationName: "static-server",
LocalBindAddress: "0.0.0.0",
LocalBindPort: 5000,
},
},
},
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(partition, "default"),
}
expect := &structs.NodeService{
Kind: "connect-proxy",
ID: "static-client-sidecar-proxy",
Service: "static-client-sidecar-proxy",
Address: "",
Port: 21000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "static-client",
DestinationServiceID: "static-client",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8080,
Config: map[string]any{
"protocol": "http",
},
Upstreams: []structs.Upstream{
{
DestinationType: "service",
DestinationNamespace: "default",
DestinationPartition: partition,
DestinationPeer: peerName,
DestinationName: "static-server",
LocalBindAddress: "0.0.0.0",
LocalBindPort: 5000,
MeshGateway: structs.MeshGatewayConfig{
Mode: "local",
},
Config: map[string]any{},
},
},
MeshGateway: structs.MeshGatewayConfig{
Mode: "local",
},
},
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(partition, "default"),
}
got, err := MergeServiceConfig(defaults, service)
require.NoError(t, err)
require.Equal(t, expect, got)
})
t.Run("dataplanes", func(t *testing.T) {
defaults := newDefaults(partition)
service := &structs.NodeService{
Kind: "connect-proxy",
ID: "static-client-sidecar-proxy",
Service: "static-client-sidecar-proxy",
Address: "10.61.57.9",
TaggedAddresses: map[string]structs.ServiceAddress{
"consul-virtual": {
Address: "240.0.0.2",
Port: 20000,
},
},
Port: 20000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "static-client",
DestinationServiceID: "static-client",
LocalServicePort: 8080,
Upstreams: []structs.Upstream{
{
DestinationType: "",
DestinationNamespace: "default",
DestinationPeer: peerName,
DestinationName: "static-server",
LocalBindAddress: "0.0.0.0",
LocalBindPort: 5000,
},
},
},
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(partition, "default"),
}
expect := &structs.NodeService{
Kind: "connect-proxy",
ID: "static-client-sidecar-proxy",
Service: "static-client-sidecar-proxy",
Address: "10.61.57.9",
TaggedAddresses: map[string]structs.ServiceAddress{
"consul-virtual": {
Address: "240.0.0.2",
Port: 20000,
},
},
Port: 20000,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "static-client",
DestinationServiceID: "static-client",
LocalServicePort: 8080,
Config: map[string]any{
"protocol": "http",
},
Upstreams: []structs.Upstream{
{
DestinationType: "",
DestinationNamespace: "default",
DestinationPeer: peerName,
DestinationName: "static-server",
LocalBindAddress: "0.0.0.0",
LocalBindPort: 5000,
MeshGateway: structs.MeshGatewayConfig{
Mode: "local", // This field vanishes if the merging does not work for dataplanes.
},
Config: map[string]any{},
},
},
MeshGateway: structs.MeshGatewayConfig{
Mode: "local",
},
},
EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(partition, "default"),
}
got, err := MergeServiceConfig(defaults, service)
require.NoError(t, err)
require.Equal(t, expect, got)
})
})
}
}
func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) { func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
type args struct { type args struct {
defaults *structs.ServiceConfigResponse defaults *structs.ServiceConfigResponse