mirror of https://github.com/status-im/consul.git
Merge pull request #11432 from hashicorp/ap/exports-mgw
[OSS] Update mesh gateways to handle partitions
This commit is contained in:
commit
303532825f
|
@ -3786,6 +3786,8 @@ func (a *Agent) registerCache() {
|
||||||
|
|
||||||
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName,
|
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName,
|
||||||
&cachetype.FederationStateListMeshGateways{RPC: a})
|
&cachetype.FederationStateListMeshGateways{RPC: a})
|
||||||
|
|
||||||
|
a.registerEntCache()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LocalState returns the agent's local state
|
// LocalState returns the agent's local state
|
||||||
|
|
|
@ -54,3 +54,5 @@ func (a *Agent) enterpriseStats() map[string]map[string]string {
|
||||||
func (a *Agent) AgentEnterpriseMeta() *structs.EnterpriseMeta {
|
func (a *Agent) AgentEnterpriseMeta() *structs.EnterpriseMeta {
|
||||||
return structs.NodeEnterpriseMetaInDefaultPartition()
|
return structs.NodeEnterpriseMetaInDefaultPartition()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) registerEntCache() {}
|
||||||
|
|
|
@ -24,15 +24,26 @@ func UpstreamSNI(u *structs.Upstream, subset string, dc string, trustDomain stri
|
||||||
return ServiceSNI(u.DestinationName, subset, u.DestinationNamespace, u.DestinationPartition, dc, trustDomain)
|
return ServiceSNI(u.DestinationName, subset, u.DestinationNamespace, u.DestinationPartition, dc, trustDomain)
|
||||||
}
|
}
|
||||||
|
|
||||||
func DatacenterSNI(dc string, trustDomain string) string {
|
func GatewaySNI(dc string, partition, trustDomain string) string {
|
||||||
return fmt.Sprintf("%s.internal.%s", dc, trustDomain)
|
if partition == "" {
|
||||||
|
// TODO(partitions) Make default available in OSS as a constant for uses like this one
|
||||||
|
partition = "default"
|
||||||
|
}
|
||||||
|
|
||||||
|
switch partition {
|
||||||
|
case "default":
|
||||||
|
return dotJoin(dc, internal, trustDomain)
|
||||||
|
default:
|
||||||
|
return dotJoin(partition, dc, internalVersion, trustDomain)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ServiceSNI(service string, subset string, namespace string, partition string, datacenter string, trustDomain string) string {
|
func ServiceSNI(service string, subset string, namespace string, partition string, datacenter string, trustDomain string) string {
|
||||||
if namespace == "" {
|
if namespace == "" {
|
||||||
namespace = "default"
|
namespace = structs.IntentionDefaultNamespace
|
||||||
}
|
}
|
||||||
if partition == "" {
|
if partition == "" {
|
||||||
|
// TODO(partitions) Make default available in OSS as a constant for uses like this one
|
||||||
partition = "default"
|
partition = "default"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -95,9 +95,39 @@ func TestUpstreamSNI(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDatacenterSNI(t *testing.T) {
|
func TestGatewaySNI(t *testing.T) {
|
||||||
require.Equal(t, "foo."+testTrustDomainSuffix1, DatacenterSNI("foo", testTrustDomain1))
|
type testCase struct {
|
||||||
require.Equal(t, "bar."+testTrustDomainSuffix2, DatacenterSNI("bar", testTrustDomain2))
|
name string
|
||||||
|
dc string
|
||||||
|
trustDomain string
|
||||||
|
expect string
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
got := GatewaySNI(tc.dc, "", tc.trustDomain)
|
||||||
|
require.Equal(t, tc.expect, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []testCase{
|
||||||
|
{
|
||||||
|
name: "foo in domain1",
|
||||||
|
dc: "foo",
|
||||||
|
trustDomain: "domain1",
|
||||||
|
expect: "foo.internal.domain1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "bar in domain2",
|
||||||
|
dc: "bar",
|
||||||
|
trustDomain: "domain2",
|
||||||
|
expect: "bar.internal.domain2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
run(t, c)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServiceSNI(t *testing.T) {
|
func TestServiceSNI(t *testing.T) {
|
||||||
|
|
|
@ -31,7 +31,10 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
||||||
|
|
||||||
wildcardEntMeta := s.proxyID.WithWildcardNamespace()
|
wildcardEntMeta := s.proxyID.WithWildcardNamespace()
|
||||||
|
|
||||||
// Watch for all services
|
// Watch for all services.
|
||||||
|
// Eventually we will have to watch connect enabled instances for each service as well as the
|
||||||
|
// destination services themselves but those notifications will be setup later.
|
||||||
|
// We cannot setup those watches until we know what the services are.
|
||||||
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
|
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
@ -43,45 +46,6 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
||||||
return snap, err
|
return snap, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.meta[structs.MetaWANFederationKey] == "1" {
|
|
||||||
// Conveniently we can just use this service meta attribute in one
|
|
||||||
// place here to set the machinery in motion and leave the conditional
|
|
||||||
// behavior out of the rest of the package.
|
|
||||||
err = s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
|
|
||||||
Datacenter: s.source.Datacenter,
|
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
||||||
Source: *s.source,
|
|
||||||
}, federationStateListGatewaysWatchID, s.ch)
|
|
||||||
if err != nil {
|
|
||||||
return snap, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
|
||||||
Datacenter: s.source.Datacenter,
|
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
||||||
ServiceName: structs.ConsulServiceName,
|
|
||||||
}, consulServerListWatchID, s.ch)
|
|
||||||
if err != nil {
|
|
||||||
return snap, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Eventually we will have to watch connect enable instances for each service as well as the
|
|
||||||
// destination services themselves but those notifications will be setup later. However we
|
|
||||||
// cannot setup those watches until we know what the services are. from the service list
|
|
||||||
// watch above
|
|
||||||
|
|
||||||
err = s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
|
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
|
|
||||||
}, datacentersWatchID, s.ch)
|
|
||||||
if err != nil {
|
|
||||||
return snap, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Once we start getting notified about the datacenters we will setup watches on the
|
|
||||||
// gateways within those other datacenters. We cannot do that here because we don't
|
|
||||||
// know what they are yet.
|
|
||||||
|
|
||||||
// Watch service-resolvers so we can setup service subset clusters
|
// Watch service-resolvers so we can setup service subset clusters
|
||||||
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
|
@ -95,17 +59,66 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
||||||
return snap, err
|
return snap, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.proxyID.InDefaultPartition() {
|
||||||
|
if err := s.initializeCrossDCWatches(ctx); err != nil {
|
||||||
|
return snap, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.initializeEntWatches(ctx); err != nil {
|
||||||
|
return snap, err
|
||||||
|
}
|
||||||
|
|
||||||
snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
|
snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
|
||||||
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
|
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
|
||||||
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
|
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
|
||||||
snap.MeshGateway.GatewayGroups = make(map[string]structs.CheckServiceNodes)
|
snap.MeshGateway.GatewayGroups = make(map[string]structs.CheckServiceNodes)
|
||||||
snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
|
snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
|
||||||
snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes)
|
snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes)
|
||||||
|
|
||||||
// there is no need to initialize the map of service resolvers as we
|
// there is no need to initialize the map of service resolvers as we
|
||||||
// fully rebuild it every time we get updates
|
// fully rebuild it every time we get updates
|
||||||
return snap, err
|
return snap, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error {
|
||||||
|
if s.meta[structs.MetaWANFederationKey] == "1" {
|
||||||
|
// Conveniently we can just use this service meta attribute in one
|
||||||
|
// place here to set the machinery in motion and leave the conditional
|
||||||
|
// behavior out of the rest of the package.
|
||||||
|
err := s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
|
||||||
|
Datacenter: s.source.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
Source: *s.source,
|
||||||
|
}, federationStateListGatewaysWatchID, s.ch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: s.source.Datacenter,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
|
ServiceName: structs.ConsulServiceName,
|
||||||
|
}, consulServerListWatchID, s.ch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
|
||||||
|
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
|
||||||
|
}, datacentersWatchID, s.ch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Once we start getting notified about the datacenters we will setup watches on the
|
||||||
|
// gateways within those other datacenters. We cannot do that here because we don't
|
||||||
|
// know what they are yet.
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||||
if u.Err != nil {
|
if u.Err != nil {
|
||||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||||
|
@ -120,6 +133,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
||||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
}
|
}
|
||||||
snap.Roots = roots
|
snap.Roots = roots
|
||||||
|
|
||||||
case federationStateListGatewaysWatchID:
|
case federationStateListGatewaysWatchID:
|
||||||
dcIndexedNodes, ok := u.Result.(*structs.DatacenterIndexedCheckServiceNodes)
|
dcIndexedNodes, ok := u.Result.(*structs.DatacenterIndexedCheckServiceNodes)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -181,8 +195,8 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
||||||
cancelFn()
|
cancelFn()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
snap.MeshGateway.WatchedServicesSet = true
|
snap.MeshGateway.WatchedServicesSet = true
|
||||||
|
|
||||||
case datacentersWatchID:
|
case datacentersWatchID:
|
||||||
datacentersRaw, ok := u.Result.(*[]string)
|
datacentersRaw, ok := u.Result.(*[]string)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -199,7 +213,10 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := snap.MeshGateway.WatchedDatacenters[dc]; !ok {
|
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||||
|
gk := GatewayKey{Datacenter: dc, Partition: entMeta.PartitionOrDefault()}
|
||||||
|
|
||||||
|
if _, ok := snap.MeshGateway.WatchedGateways[gk.String()]; !ok {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
|
@ -207,36 +224,42 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
||||||
ServiceKind: structs.ServiceKindMeshGateway,
|
ServiceKind: structs.ServiceKindMeshGateway,
|
||||||
UseServiceKind: true,
|
UseServiceKind: true,
|
||||||
Source: *s.source,
|
Source: *s.source,
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
EnterpriseMeta: *entMeta,
|
||||||
}, fmt.Sprintf("mesh-gateway:%s", dc), s.ch)
|
}, fmt.Sprintf("mesh-gateway:%s", gk.String()), s.ch)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
meshLogger.Error("failed to register watch for mesh-gateway",
|
meshLogger.Error("failed to register watch for mesh-gateway",
|
||||||
"datacenter", dc,
|
"datacenter", dc,
|
||||||
|
"partition", entMeta.PartitionOrDefault(),
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
cancel()
|
cancel()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
snap.MeshGateway.WatchedGateways[gk.String()] = cancel
|
||||||
snap.MeshGateway.WatchedDatacenters[dc] = cancel
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for dc, cancelFn := range snap.MeshGateway.WatchedDatacenters {
|
for key, cancelFn := range snap.MeshGateway.WatchedGateways {
|
||||||
|
gk := gatewayKeyFromString(key)
|
||||||
|
if gk.Datacenter == s.source.Datacenter {
|
||||||
|
// Only cross-DC watches are managed by the datacenters watch.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
found := false
|
found := false
|
||||||
for _, dcCurrent := range datacenters {
|
for _, dcCurrent := range datacenters {
|
||||||
if dcCurrent == dc {
|
if dcCurrent == gk.Datacenter {
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
delete(snap.MeshGateway.WatchedDatacenters, dc)
|
delete(snap.MeshGateway.WatchedGateways, key)
|
||||||
cancelFn()
|
cancelFn()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case serviceResolversWatchID:
|
case serviceResolversWatchID:
|
||||||
configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
|
configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -286,23 +309,27 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
||||||
} else if _, ok := snap.MeshGateway.ServiceGroups[sn]; ok {
|
} else if _, ok := snap.MeshGateway.ServiceGroups[sn]; ok {
|
||||||
delete(snap.MeshGateway.ServiceGroups, sn)
|
delete(snap.MeshGateway.ServiceGroups, sn)
|
||||||
}
|
}
|
||||||
|
|
||||||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||||
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
}
|
}
|
||||||
|
|
||||||
dc := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
key := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||||
delete(snap.MeshGateway.GatewayGroups, dc)
|
delete(snap.MeshGateway.GatewayGroups, key)
|
||||||
delete(snap.MeshGateway.HostnameDatacenters, dc)
|
delete(snap.MeshGateway.HostnameDatacenters, key)
|
||||||
|
|
||||||
if len(resp.Nodes) > 0 {
|
if len(resp.Nodes) > 0 {
|
||||||
snap.MeshGateway.GatewayGroups[dc] = resp.Nodes
|
snap.MeshGateway.GatewayGroups[key] = resp.Nodes
|
||||||
snap.MeshGateway.HostnameDatacenters[dc] = hostnameEndpoints(
|
snap.MeshGateway.HostnameDatacenters[key] = hostnameEndpoints(
|
||||||
s.logger.Named(logging.MeshGateway), snap.Datacenter, resp.Nodes)
|
s.logger.Named(logging.MeshGateway), snap.Datacenter, resp.Nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// do nothing for now
|
if err := s.handleEntUpdate(meshLogger, ctx, u, snap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
// +build !consulent
|
||||||
|
|
||||||
|
package proxycfg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *handlerMeshGateway) initializeEntWatches(_ context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *handlerMeshGateway) handleEntUpdate(_ hclog.Logger, _ context.Context, _ cache.UpdateEvent, _ *ConfigSnapshot) error {
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -60,19 +60,28 @@ type GatewayKey struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k GatewayKey) String() string {
|
func (k GatewayKey) String() string {
|
||||||
return k.Partition + "." + k.Datacenter
|
resp := k.Datacenter
|
||||||
|
if k.Partition != "" {
|
||||||
|
resp = k.Partition + "." + resp
|
||||||
|
}
|
||||||
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k GatewayKey) IsEmpty() bool {
|
func (k GatewayKey) IsEmpty() bool {
|
||||||
return k.Partition == "" && k.Datacenter == ""
|
return k.Partition == "" && k.Datacenter == ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k GatewayKey) Matches(dc, partition string) bool {
|
||||||
|
return structs.EqualPartitions(k.Partition, partition) && k.Datacenter == dc
|
||||||
|
}
|
||||||
|
|
||||||
func gatewayKeyFromString(s string) GatewayKey {
|
func gatewayKeyFromString(s string) GatewayKey {
|
||||||
split := strings.SplitN(s, ".", 2)
|
split := strings.SplitN(s, ".", 2)
|
||||||
return GatewayKey{
|
|
||||||
Partition: split[0],
|
if len(split) == 1 {
|
||||||
Datacenter: split[1],
|
return GatewayKey{Datacenter: split[0]}
|
||||||
}
|
}
|
||||||
|
return GatewayKey{Partition: split[0], Datacenter: split[1]}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServicePassthroughAddrs contains the LAN addrs
|
// ServicePassthroughAddrs contains the LAN addrs
|
||||||
|
@ -256,10 +265,10 @@ type configSnapshotMeshGateway struct {
|
||||||
// health check to pass.
|
// health check to pass.
|
||||||
WatchedServicesSet bool
|
WatchedServicesSet bool
|
||||||
|
|
||||||
// WatchedDatacenters is a map of datacenter name to a cancel function.
|
// WatchedGateways is a map of GatewayKeys to a cancel function.
|
||||||
// This cancel function is tied to the watch of mesh-gateway services in
|
// This cancel function is tied to the watch of mesh-gateway services in
|
||||||
// that datacenter.
|
// that datacenter/partition.
|
||||||
WatchedDatacenters map[string]context.CancelFunc
|
WatchedGateways map[string]context.CancelFunc
|
||||||
|
|
||||||
// ServiceGroups is a map of service name to the service instances of that
|
// ServiceGroups is a map of service name to the service instances of that
|
||||||
// service in the local datacenter.
|
// service in the local datacenter.
|
||||||
|
@ -285,7 +294,7 @@ type configSnapshotMeshGateway struct {
|
||||||
HostnameDatacenters map[string]structs.CheckServiceNodes
|
HostnameDatacenters map[string]structs.CheckServiceNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configSnapshotMeshGateway) Datacenters() []string {
|
func (c *configSnapshotMeshGateway) GatewayKeys() []GatewayKey {
|
||||||
sz1, sz2 := len(c.GatewayGroups), len(c.FedStateGateways)
|
sz1, sz2 := len(c.GatewayGroups), len(c.FedStateGateways)
|
||||||
|
|
||||||
sz := sz1
|
sz := sz1
|
||||||
|
@ -293,20 +302,25 @@ func (c *configSnapshotMeshGateway) Datacenters() []string {
|
||||||
sz = sz2
|
sz = sz2
|
||||||
}
|
}
|
||||||
|
|
||||||
dcs := make([]string, 0, sz)
|
keys := make([]GatewayKey, 0, sz)
|
||||||
for dc := range c.GatewayGroups {
|
for key := range c.GatewayGroups {
|
||||||
dcs = append(dcs, dc)
|
keys = append(keys, gatewayKeyFromString(key))
|
||||||
}
|
}
|
||||||
for dc := range c.FedStateGateways {
|
for key := range c.FedStateGateways {
|
||||||
if _, ok := c.GatewayGroups[dc]; !ok {
|
if _, ok := c.GatewayGroups[key]; !ok {
|
||||||
dcs = append(dcs, dc)
|
keys = append(keys, gatewayKeyFromString(key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Always sort the results to ensure we generate deterministic things over
|
// Always sort the results to ensure we generate deterministic things over
|
||||||
// xDS, such as mesh-gateway listener filter chains.
|
// xDS, such as mesh-gateway listener filter chains.
|
||||||
sort.Strings(dcs)
|
sort.Slice(keys, func(i, j int) bool {
|
||||||
return dcs
|
if keys[i].Datacenter != keys[j].Datacenter {
|
||||||
|
return keys[i].Datacenter < keys[j].Datacenter
|
||||||
|
}
|
||||||
|
return keys[i].Partition < keys[j].Partition
|
||||||
|
})
|
||||||
|
return keys
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configSnapshotMeshGateway) IsEmpty() bool {
|
func (c *configSnapshotMeshGateway) IsEmpty() bool {
|
||||||
|
@ -315,7 +329,7 @@ func (c *configSnapshotMeshGateway) IsEmpty() bool {
|
||||||
}
|
}
|
||||||
return len(c.WatchedServices) == 0 &&
|
return len(c.WatchedServices) == 0 &&
|
||||||
!c.WatchedServicesSet &&
|
!c.WatchedServicesSet &&
|
||||||
len(c.WatchedDatacenters) == 0 &&
|
len(c.WatchedGateways) == 0 &&
|
||||||
len(c.ServiceGroups) == 0 &&
|
len(c.ServiceGroups) == 0 &&
|
||||||
len(c.ServiceResolvers) == 0 &&
|
len(c.ServiceResolvers) == 0 &&
|
||||||
len(c.GatewayGroups) == 0 &&
|
len(c.GatewayGroups) == 0 &&
|
||||||
|
@ -466,7 +480,7 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
|
||||||
snap.TerminatingGateway.WatchedConfigs = nil
|
snap.TerminatingGateway.WatchedConfigs = nil
|
||||||
snap.TerminatingGateway.WatchedResolvers = nil
|
snap.TerminatingGateway.WatchedResolvers = nil
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
snap.MeshGateway.WatchedDatacenters = nil
|
snap.MeshGateway.WatchedGateways = nil
|
||||||
snap.MeshGateway.WatchedServices = nil
|
snap.MeshGateway.WatchedServices = nil
|
||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
snap.IngressGateway.WatchedUpstreams = nil
|
snap.IngressGateway.WatchedUpstreams = nil
|
||||||
|
|
|
@ -750,7 +750,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
require.Equal(t, indexedRoots, snap.Roots)
|
require.Equal(t, indexedRoots, snap.Roots)
|
||||||
require.Empty(t, snap.MeshGateway.WatchedServices)
|
require.Empty(t, snap.MeshGateway.WatchedServices)
|
||||||
require.False(t, snap.MeshGateway.WatchedServicesSet)
|
require.False(t, snap.MeshGateway.WatchedServicesSet)
|
||||||
require.Empty(t, snap.MeshGateway.WatchedDatacenters)
|
require.Empty(t, snap.MeshGateway.WatchedGateways)
|
||||||
require.Empty(t, snap.MeshGateway.ServiceGroups)
|
require.Empty(t, snap.MeshGateway.ServiceGroups)
|
||||||
require.Empty(t, snap.MeshGateway.ServiceResolvers)
|
require.Empty(t, snap.MeshGateway.ServiceResolvers)
|
||||||
require.Empty(t, snap.MeshGateway.GatewayGroups)
|
require.Empty(t, snap.MeshGateway.GatewayGroups)
|
||||||
|
@ -772,7 +772,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
require.Equal(t, indexedRoots, snap.Roots)
|
require.Equal(t, indexedRoots, snap.Roots)
|
||||||
require.Empty(t, snap.MeshGateway.WatchedServices)
|
require.Empty(t, snap.MeshGateway.WatchedServices)
|
||||||
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
||||||
require.Empty(t, snap.MeshGateway.WatchedDatacenters)
|
require.Empty(t, snap.MeshGateway.WatchedGateways)
|
||||||
require.Empty(t, snap.MeshGateway.ServiceGroups)
|
require.Empty(t, snap.MeshGateway.ServiceGroups)
|
||||||
require.Empty(t, snap.MeshGateway.ServiceResolvers)
|
require.Empty(t, snap.MeshGateway.ServiceResolvers)
|
||||||
require.Empty(t, snap.MeshGateway.GatewayGroups)
|
require.Empty(t, snap.MeshGateway.GatewayGroups)
|
||||||
|
|
|
@ -1542,8 +1542,8 @@ func testConfigSnapshotMeshGateway(t testing.T, populateServices bool, useFedera
|
||||||
structs.NewServiceName("bar", nil): nil,
|
structs.NewServiceName("bar", nil): nil,
|
||||||
},
|
},
|
||||||
WatchedServicesSet: true,
|
WatchedServicesSet: true,
|
||||||
WatchedDatacenters: map[string]context.CancelFunc{
|
WatchedGateways: map[string]context.CancelFunc{
|
||||||
"dc2": nil,
|
"default.dc2": nil,
|
||||||
},
|
},
|
||||||
ServiceGroups: map[structs.ServiceName]structs.CheckServiceNodes{
|
ServiceGroups: map[structs.ServiceName]structs.CheckServiceNodes{
|
||||||
structs.NewServiceName("foo", nil): TestGatewayServiceGroupFooDC1(t),
|
structs.NewServiceName("foo", nil): TestGatewayServiceGroupFooDC1(t),
|
||||||
|
|
|
@ -202,40 +202,43 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message,
|
||||||
// for a mesh gateway. This will include 1 cluster per remote datacenter as well as
|
// for a mesh gateway. This will include 1 cluster per remote datacenter as well as
|
||||||
// 1 cluster for each service subset.
|
// 1 cluster for each service subset.
|
||||||
func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
datacenters := cfgSnap.MeshGateway.Datacenters()
|
keys := cfgSnap.MeshGateway.GatewayKeys()
|
||||||
|
|
||||||
// 1 cluster per remote dc + 1 cluster per local service (this is a lower bound - all subset specific clusters will be appended)
|
// 1 cluster per remote dc/partition + 1 cluster per local service (this is a lower bound - all subset specific clusters will be appended)
|
||||||
clusters := make([]proto.Message, 0, len(datacenters)+len(cfgSnap.MeshGateway.ServiceGroups))
|
clusters := make([]proto.Message, 0, len(keys)+len(cfgSnap.MeshGateway.ServiceGroups))
|
||||||
|
|
||||||
// generate the remote dc clusters
|
// Generate the remote clusters
|
||||||
for _, dc := range datacenters {
|
for _, key := range keys {
|
||||||
if dc == cfgSnap.Datacenter {
|
if key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrEmpty()) {
|
||||||
continue // skip local
|
continue // skip local
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := gatewayClusterOpts{
|
opts := gatewayClusterOpts{
|
||||||
name: connect.DatacenterSNI(dc, cfgSnap.Roots.TrustDomain),
|
name: connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain),
|
||||||
hostnameEndpoints: cfgSnap.MeshGateway.HostnameDatacenters[dc],
|
hostnameEndpoints: cfgSnap.MeshGateway.HostnameDatacenters[key.String()],
|
||||||
isRemote: dc != cfgSnap.Datacenter,
|
isRemote: key.Datacenter != cfgSnap.Datacenter,
|
||||||
}
|
}
|
||||||
cluster := s.makeGatewayCluster(cfgSnap, opts)
|
cluster := s.makeGatewayCluster(cfgSnap, opts)
|
||||||
clusters = append(clusters, cluster)
|
clusters = append(clusters, cluster)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
|
if cfgSnap.ProxyID.InDefaultPartition() &&
|
||||||
|
cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" &&
|
||||||
|
cfgSnap.ServerSNIFn != nil {
|
||||||
|
|
||||||
// Add all of the remote wildcard datacenter mappings for servers.
|
// Add all of the remote wildcard datacenter mappings for servers.
|
||||||
for _, dc := range datacenters {
|
for _, key := range keys {
|
||||||
hostnameEndpoints := cfgSnap.MeshGateway.HostnameDatacenters[dc]
|
hostnameEndpoints := cfgSnap.MeshGateway.HostnameDatacenters[key.String()]
|
||||||
|
|
||||||
// If the DC is our current DC then this cluster is for traffic from a remote DC to a local server.
|
// If the DC is our current DC then this cluster is for traffic from a remote DC to a local server.
|
||||||
// HostnameDatacenters is populated with gateway addresses, so it does not apply here.
|
// HostnameDatacenters is populated with gateway addresses, so it does not apply here.
|
||||||
if dc == cfgSnap.Datacenter {
|
if key.Datacenter == cfgSnap.Datacenter {
|
||||||
hostnameEndpoints = nil
|
hostnameEndpoints = nil
|
||||||
}
|
}
|
||||||
opts := gatewayClusterOpts{
|
opts := gatewayClusterOpts{
|
||||||
name: cfgSnap.ServerSNIFn(dc, ""),
|
name: cfgSnap.ServerSNIFn(key.Datacenter, ""),
|
||||||
hostnameEndpoints: hostnameEndpoints,
|
hostnameEndpoints: hostnameEndpoints,
|
||||||
isRemote: dc != cfgSnap.Datacenter,
|
isRemote: key.Datacenter != cfgSnap.Datacenter,
|
||||||
}
|
}
|
||||||
cluster := s.makeGatewayCluster(cfgSnap, opts)
|
cluster := s.makeGatewayCluster(cfgSnap, opts)
|
||||||
clusters = append(clusters, cluster)
|
clusters = append(clusters, cluster)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package xds
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||||
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||||
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
||||||
|
@ -109,28 +110,30 @@ func (s *ResourceGenerator) endpointsFromSnapshotTerminatingGateway(cfgSnap *pro
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
datacenters := cfgSnap.MeshGateway.Datacenters()
|
keys := cfgSnap.MeshGateway.GatewayKeys()
|
||||||
resources := make([]proto.Message, 0, len(datacenters)+len(cfgSnap.MeshGateway.ServiceGroups))
|
resources := make([]proto.Message, 0, len(keys)+len(cfgSnap.MeshGateway.ServiceGroups))
|
||||||
|
|
||||||
// generate the endpoints for the gateways in the remote datacenters
|
for _, key := range keys {
|
||||||
for _, dc := range datacenters {
|
if key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrEmpty()) {
|
||||||
// Skip creating endpoints for mesh gateways in local DC and gateways in remote DCs with a hostname as their address
|
continue // skip local
|
||||||
// EDS cannot resolve hostnames so we provide them through CDS instead
|
}
|
||||||
if dc == cfgSnap.Datacenter || len(cfgSnap.MeshGateway.HostnameDatacenters[dc]) > 0 {
|
// Also skip gateways with a hostname as their address. EDS cannot resolve hostnames,
|
||||||
|
// so we provide them through CDS instead.
|
||||||
|
if len(cfgSnap.MeshGateway.HostnameDatacenters[key.String()]) > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
endpoints, ok := cfgSnap.MeshGateway.GatewayGroups[dc]
|
endpoints, ok := cfgSnap.MeshGateway.GatewayGroups[key.String()]
|
||||||
if !ok {
|
if !ok {
|
||||||
endpoints, ok = cfgSnap.MeshGateway.FedStateGateways[dc]
|
endpoints, ok = cfgSnap.MeshGateway.FedStateGateways[key.String()]
|
||||||
if !ok { // not possible
|
if !ok { // not possible
|
||||||
s.Logger.Error("skipping mesh gateway endpoints because no definition found", "datacenter", dc)
|
s.Logger.Error("skipping mesh gateway endpoints because no definition found", "datacenter", key)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // standard connect
|
{ // standard connect
|
||||||
clusterName := connect.DatacenterSNI(dc, cfgSnap.Roots.TrustDomain)
|
clusterName := connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain)
|
||||||
|
|
||||||
la := makeLoadAssignment(
|
la := makeLoadAssignment(
|
||||||
clusterName,
|
clusterName,
|
||||||
|
@ -142,9 +145,11 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
||||||
resources = append(resources, la)
|
resources = append(resources, la)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
|
if cfgSnap.ProxyID.InDefaultPartition() &&
|
||||||
clusterName := cfgSnap.ServerSNIFn(dc, "")
|
cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" &&
|
||||||
|
cfgSnap.ServerSNIFn != nil {
|
||||||
|
|
||||||
|
clusterName := cfgSnap.ServerSNIFn(key.Datacenter, "")
|
||||||
la := makeLoadAssignment(
|
la := makeLoadAssignment(
|
||||||
clusterName,
|
clusterName,
|
||||||
[]loadAssignmentEndpointGroup{
|
[]loadAssignmentEndpointGroup{
|
||||||
|
@ -157,7 +162,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
||||||
}
|
}
|
||||||
|
|
||||||
// generate endpoints for our servers if WAN federation is enabled
|
// generate endpoints for our servers if WAN federation is enabled
|
||||||
if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
|
if cfgSnap.ProxyID.InDefaultPartition() &&
|
||||||
|
cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" &&
|
||||||
|
cfgSnap.ServerSNIFn != nil {
|
||||||
var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint
|
var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint
|
||||||
|
|
||||||
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
|
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
|
||||||
|
|
|
@ -1135,15 +1135,15 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
|
||||||
l := makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_UNSPECIFIED)
|
l := makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_UNSPECIFIED)
|
||||||
l.ListenerFilters = []*envoy_listener_v3.ListenerFilter{tlsInspector}
|
l.ListenerFilters = []*envoy_listener_v3.ListenerFilter{tlsInspector}
|
||||||
|
|
||||||
// TODO (mesh-gateway) - Do we need to create clusters for all the old trust domains as well?
|
// We need 1 Filter Chain per remote cluster
|
||||||
// We need 1 Filter Chain per datacenter
|
keys := cfgSnap.MeshGateway.GatewayKeys()
|
||||||
datacenters := cfgSnap.MeshGateway.Datacenters()
|
for _, key := range keys {
|
||||||
for _, dc := range datacenters {
|
if key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrEmpty()) {
|
||||||
if dc == cfgSnap.Datacenter {
|
|
||||||
continue // skip local
|
continue // skip local
|
||||||
}
|
}
|
||||||
clusterName := connect.DatacenterSNI(dc, cfgSnap.Roots.TrustDomain)
|
|
||||||
filterName := fmt.Sprintf("%s.%s", name, dc)
|
clusterName := connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain)
|
||||||
|
filterName := fmt.Sprintf("%s.%s", name, key.String())
|
||||||
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_remote.")
|
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_remote.")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1159,13 +1159,16 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
|
if cfgSnap.ProxyID.InDefaultPartition() &&
|
||||||
for _, dc := range datacenters {
|
cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" &&
|
||||||
if dc == cfgSnap.Datacenter {
|
cfgSnap.ServerSNIFn != nil {
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
if key.Datacenter == cfgSnap.Datacenter {
|
||||||
continue // skip local
|
continue // skip local
|
||||||
}
|
}
|
||||||
clusterName := cfgSnap.ServerSNIFn(dc, "")
|
clusterName := cfgSnap.ServerSNIFn(key.Datacenter, "")
|
||||||
filterName := fmt.Sprintf("%s.%s", name, dc)
|
filterName := fmt.Sprintf("%s.%s", name, key.String())
|
||||||
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_remote.")
|
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_remote.")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in New Issue