Update service manager to store centrally configured upstreams

This commit is contained in:
freddygv 2021-03-11 10:49:43 -07:00
parent 6fd30d0384
commit df1f3995f8
3 changed files with 171 additions and 14 deletions

View File

@ -383,38 +383,72 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
ns.Proxy.TransparentProxy = defaults.TransparentProxy ns.Proxy.TransparentProxy = defaults.TransparentProxy
} }
// Merge upstream defaults if there were any returned // seenUpstreams stores the upstreams seen from the local registration so that we can also add synthetic entries.
// for upstream configuration that was defined via service-defaults.UpstreamConfigs. In TransparentProxy mode
// ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with thosee values.
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
for _, us := range defaults.UpstreamIDConfigs {
parsed, err := structs.ParseUpstreamConfig(us.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err)
}
// Delete the mesh gateway key since this is the only place it is read from an opaque map.
// Later reads use Proxy.MeshGateway.
delete(us.Config, "mesh_gateway")
remoteUpstreams[us.Upstream] = structs.Upstream{
DestinationNamespace: us.Upstream.NamespaceOrDefault(),
DestinationName: us.Upstream.ID,
Config: us.Config,
MeshGateway: parsed.MeshGateway,
CentrallyConfigured: true,
}
}
localUpstreams := make(map[structs.ServiceID]struct{})
// Merge upstream defaults into the local registration
for i := range ns.Proxy.Upstreams { for i := range ns.Proxy.Upstreams {
// Get a pointer not a value copy of the upstream struct // Get a pointer not a value copy of the upstream struct
us := &ns.Proxy.Upstreams[i] us := &ns.Proxy.Upstreams[i]
if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService { if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService {
continue continue
} }
localUpstreams[us.DestinationID()] = struct{}{}
usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) usCfg, ok := remoteUpstreams[us.DestinationID()]
if !ok { if !ok {
// No config defaults to merge // No config defaults to merge
continue continue
} }
// MeshGateway mode is fetched separately since it is a first class field and not read from us.Config
parsed, err := structs.ParseUpstreamConfig(usCfg)
if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Identifier(), err)
}
// The local upstream config mode has the highest precedence, so only overwrite when it's set to the default // The local upstream config mode has the highest precedence, so only overwrite when it's set to the default
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault { if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
us.MeshGateway.Mode = parsed.MeshGateway.Mode us.MeshGateway.Mode = usCfg.MeshGateway.Mode
} }
// Delete the mesh gateway key since this is the only place it is read from an opaque map.
delete(usCfg, "mesh_gateway")
// Merge in everything else that is read from the map // Merge in everything else that is read from the map
if err := mergo.Merge(&us.Config, usCfg); err != nil { if err := mergo.Merge(&us.Config, usCfg.Config); err != nil {
return nil, err return nil, err
} }
} }
// Ensure upstreams present in central config are represented in the local configuration.
// This does not apply outside of TransparentProxy mode because in that situation every upstream needs to be defined
// explicitly and locally with a local bind port.
if ns.Proxy.TransparentProxy {
for id, remote := range remoteUpstreams {
if _, ok := localUpstreams[id]; ok {
// Remote upstream is already present locally
continue
}
ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote)
}
}
return ns, err return ns, err
} }

View File

@ -922,6 +922,125 @@ func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) {
}, },
}, },
}, },
{
name: "remote upstream config expands local upstream list in tproxy mode",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
TransparentProxy: true,
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
TransparentProxy: true,
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
Config: map[string]interface{}{
"protocol": "grpc",
},
CentrallyConfigured: true,
},
},
},
},
},
{
name: "remote upstream config not added to local upstream list outside of tproxy mode",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
TransparentProxy: false,
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
{ {
name: "upstream mode from remote defaults overrides local default", name: "upstream mode from remote defaults overrides local default",
args: args{ args: args{

View File

@ -265,6 +265,10 @@ type Upstream struct {
// an ingress gateway. This cannot and should not be set by a user, it is // an ingress gateway. This cannot and should not be set by a user, it is
// used internally to store the association of hosts to an upstream service. // used internally to store the association of hosts to an upstream service.
IngressHosts []string `json:"-" bexpr:"-"` IngressHosts []string `json:"-" bexpr:"-"`
// CentrallyConfigured indicates whether the upstream was defined in a proxy
// instance registration or whether it was generated from a config entry.
CentrallyConfigured bool
} }
func (t *Upstream) UnmarshalJSON(data []byte) (err error) { func (t *Upstream) UnmarshalJSON(data []byte) (err error) {
@ -321,7 +325,7 @@ func (u *Upstream) Validate() error {
return fmt.Errorf("upstream destination name cannot be empty") return fmt.Errorf("upstream destination name cannot be empty")
} }
if u.LocalBindPort == 0 { if u.LocalBindPort == 0 && !u.CentrallyConfigured {
return fmt.Errorf("upstream local bind port cannot be zero") return fmt.Errorf("upstream local bind port cannot be zero")
} }
return nil return nil