mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
Configure sidecars to watch gateways in partitions
Previously the datacenter of the gateway was the key identifier, now it is the datacenter and partition. When dialing services in other partitions or datacenters we now watch the appropriate partition.
This commit is contained in:
parent
eacb73cb78
commit
62e0fc62c1
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/mitchellh/copystructure"
|
||||
|
||||
@ -38,11 +39,11 @@ type ConfigSnapshotUpstreams struct {
|
||||
WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes
|
||||
|
||||
// WatchedGateways is a map of upstream.Identifier() -> (map of
|
||||
// TargetID -> CancelFunc) in order to cancel watches for mesh gateways
|
||||
// GatewayKey.String() -> CancelFunc) in order to cancel watches for mesh gateways
|
||||
WatchedGateways map[string]map[string]context.CancelFunc
|
||||
|
||||
// WatchedGatewayEndpoints is a map of upstream.Identifier() -> (map of
|
||||
// TargetID -> CheckServiceNodes) and is used to determine the backing
|
||||
// GatewayKey.String() -> CheckServiceNodes) and is used to determine the backing
|
||||
// endpoints of a mesh gateway.
|
||||
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
|
||||
|
||||
@ -53,6 +54,27 @@ type ConfigSnapshotUpstreams struct {
|
||||
PassthroughUpstreams map[string]ServicePassthroughAddrs
|
||||
}
|
||||
|
||||
type GatewayKey struct {
|
||||
Datacenter string
|
||||
Partition string
|
||||
}
|
||||
|
||||
func (k GatewayKey) String() string {
|
||||
return k.Partition + "." + k.Datacenter
|
||||
}
|
||||
|
||||
func (k GatewayKey) IsEmpty() bool {
|
||||
return k.Partition == "" && k.Datacenter == ""
|
||||
}
|
||||
|
||||
func gatewayKeyFromString(s string) GatewayKey {
|
||||
split := strings.Split(s, ".")
|
||||
return GatewayKey{
|
||||
Partition: split[0],
|
||||
Datacenter: split[1],
|
||||
}
|
||||
}
|
||||
|
||||
// ServicePassthroughAddrs contains the LAN addrs
|
||||
type ServicePassthroughAddrs struct {
|
||||
// SNI is the Service SNI of the upstream.
|
||||
|
@ -3,6 +3,7 @@ package proxycfg
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"time"
|
||||
@ -426,3 +427,23 @@ func hostnameEndpoints(logger hclog.Logger, localDC string, nodes structs.CheckS
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
type gatewayWatchOpts struct {
|
||||
notifier CacheNotifier
|
||||
notifyCh chan cache.UpdateEvent
|
||||
source structs.QuerySource
|
||||
token string
|
||||
key GatewayKey
|
||||
upstreamID string
|
||||
}
|
||||
|
||||
func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error {
|
||||
return opts.notifier.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||
Datacenter: opts.key.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: opts.token},
|
||||
ServiceKind: structs.ServiceKindMeshGateway,
|
||||
UseServiceKind: true,
|
||||
Source: opts.source,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(opts.key.Partition),
|
||||
}, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID), opts.notifyCh)
|
||||
}
|
||||
|
@ -649,8 +649,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||
"upstream-target:api-failover-remote.default.default.dc2:api-failover-remote?dc=dc2": genVerifyServiceWatch("api-failover-remote", "", "dc2", true),
|
||||
"upstream-target:api-failover-local.default.default.dc2:api-failover-local?dc=dc2": genVerifyServiceWatch("api-failover-local", "", "dc2", true),
|
||||
"upstream-target:api-failover-direct.default.default.dc2:api-failover-direct?dc=dc2": genVerifyServiceWatch("api-failover-direct", "", "dc2", true),
|
||||
"mesh-gateway:dc2:api-failover-remote?dc=dc2": genVerifyGatewayWatch("dc2"),
|
||||
"mesh-gateway:dc1:api-failover-local?dc=dc2": genVerifyGatewayWatch("dc1"),
|
||||
"mesh-gateway:default.dc2:api-failover-remote?dc=dc2": genVerifyGatewayWatch("dc2"),
|
||||
"mesh-gateway:default.dc1:api-failover-local?dc=dc2": genVerifyGatewayWatch("dc1"),
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid())
|
||||
@ -673,7 +673,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||
}
|
||||
|
||||
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
|
||||
stage1.requiredWatches["mesh-gateway:dc1:api-dc2"] = genVerifyGatewayWatch("dc1")
|
||||
stage1.requiredWatches["mesh-gateway:default.dc1:api-dc2"] = genVerifyGatewayWatch("dc1")
|
||||
}
|
||||
|
||||
return testCase{
|
||||
|
@ -1429,7 +1429,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
||||
TestUpstreamNodesDC2(t)
|
||||
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
|
||||
"db": {
|
||||
"dc2": TestGatewayNodesDC2(t),
|
||||
"default.dc2": TestGatewayNodesDC2(t),
|
||||
},
|
||||
}
|
||||
case "failover-through-double-remote-gateway-triggered":
|
||||
@ -1442,8 +1442,8 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
||||
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
|
||||
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
|
||||
"db": {
|
||||
"dc2": TestGatewayNodesDC2(t),
|
||||
"dc3": TestGatewayNodesDC3(t),
|
||||
"default.dc2": TestGatewayNodesDC2(t),
|
||||
"default.dc3": TestGatewayNodesDC3(t),
|
||||
},
|
||||
}
|
||||
case "failover-through-local-gateway-triggered":
|
||||
@ -1455,7 +1455,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
||||
TestUpstreamNodesDC2(t)
|
||||
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
|
||||
"db": {
|
||||
"dc1": TestGatewayNodesDC1(t),
|
||||
"default.dc1": TestGatewayNodesDC1(t),
|
||||
},
|
||||
}
|
||||
case "failover-through-double-local-gateway-triggered":
|
||||
@ -1468,7 +1468,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
||||
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
|
||||
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
|
||||
"db": {
|
||||
"dc1": TestGatewayNodesDC1(t),
|
||||
"default.dc1": TestGatewayNodesDC1(t),
|
||||
},
|
||||
}
|
||||
case "splitter-with-resolver-redirect-multidc":
|
||||
@ -1737,9 +1737,10 @@ func testConfigSnapshotIngressGateway(
|
||||
{protocol, 9191}: {
|
||||
{
|
||||
// We rely on this one having default type in a few tests...
|
||||
DestinationName: "db",
|
||||
LocalBindPort: 9191,
|
||||
LocalBindAddress: "2.3.4.5",
|
||||
DestinationName: "db",
|
||||
DestinationPartition: "default",
|
||||
LocalBindPort: 9191,
|
||||
LocalBindAddress: "2.3.4.5",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -118,14 +118,15 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||
dc, svc, ok := removeColonPrefix(correlationID)
|
||||
key, svc, ok := removeColonPrefix(correlationID)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
|
||||
}
|
||||
if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[svc]; !ok {
|
||||
upstreamsSnapshot.WatchedGatewayEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
|
||||
}
|
||||
upstreamsSnapshot.WatchedGatewayEndpoints[svc][dc] = resp.Nodes
|
||||
upstreamsSnapshot.WatchedGatewayEndpoints[svc][key] = resp.Nodes
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
|
||||
}
|
||||
@ -207,11 +208,22 @@ func (s *handlerUpstreams) resetWatchesFromChain(
|
||||
|
||||
// We'll get endpoints from the gateway query, but the health still has
|
||||
// to come from the backing service query.
|
||||
var gk GatewayKey
|
||||
|
||||
switch target.MeshGateway.Mode {
|
||||
case structs.MeshGatewayModeRemote:
|
||||
needGateways[target.Datacenter] = struct{}{}
|
||||
gk = GatewayKey{
|
||||
Partition: target.Partition,
|
||||
Datacenter: target.Datacenter,
|
||||
}
|
||||
case structs.MeshGatewayModeLocal:
|
||||
needGateways[s.source.Datacenter] = struct{}{}
|
||||
gk = GatewayKey{
|
||||
Partition: s.source.NodePartitionOrDefault(),
|
||||
Datacenter: s.source.Datacenter,
|
||||
}
|
||||
}
|
||||
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
|
||||
needGateways[gk.String()] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
@ -240,38 +252,51 @@ func (s *handlerUpstreams) resetWatchesFromChain(
|
||||
}
|
||||
}
|
||||
|
||||
for dc := range needGateways {
|
||||
if _, ok := snap.WatchedGateways[id][dc]; ok {
|
||||
for key := range needGateways {
|
||||
if _, ok := snap.WatchedGateways[id][key]; ok {
|
||||
continue
|
||||
}
|
||||
gwKey := gatewayKeyFromString(key)
|
||||
|
||||
s.logger.Trace("initializing watch of mesh gateway in datacenter",
|
||||
s.logger.Trace("initializing watch of mesh gateway",
|
||||
"upstream", id,
|
||||
"chain", chain.ServiceName,
|
||||
"datacenter", dc,
|
||||
"datacenter", gwKey.Datacenter,
|
||||
"partition", gwKey.Partition,
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.watchMeshGateway(ctx, dc, id)
|
||||
opts := gatewayWatchOpts{
|
||||
notifier: s.cache,
|
||||
notifyCh: s.ch,
|
||||
source: *s.source,
|
||||
token: s.token,
|
||||
key: gwKey,
|
||||
upstreamID: id,
|
||||
}
|
||||
err := watchMeshGateway(ctx, opts)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
snap.WatchedGateways[id][dc] = cancel
|
||||
snap.WatchedGateways[id][key] = cancel
|
||||
}
|
||||
|
||||
for dc, cancelFn := range snap.WatchedGateways[id] {
|
||||
if _, ok := needGateways[dc]; ok {
|
||||
for key, cancelFn := range snap.WatchedGateways[id] {
|
||||
if _, ok := needGateways[key]; ok {
|
||||
continue
|
||||
}
|
||||
s.logger.Trace("stopping watch of mesh gateway in datacenter",
|
||||
gwKey := gatewayKeyFromString(key)
|
||||
|
||||
s.logger.Trace("stopping watch of mesh gateway",
|
||||
"upstream", id,
|
||||
"chain", chain.ServiceName,
|
||||
"datacenter", dc,
|
||||
"datacenter", gwKey.Datacenter,
|
||||
"partition", gwKey.Partition,
|
||||
)
|
||||
delete(snap.WatchedGateways[id], dc)
|
||||
delete(snap.WatchedGatewayEndpoints[id], dc)
|
||||
delete(snap.WatchedGateways[id], key)
|
||||
delete(snap.WatchedGatewayEndpoints[id], key)
|
||||
cancelFn()
|
||||
}
|
||||
|
||||
@ -287,17 +312,6 @@ type targetWatchOpts struct {
|
||||
entMeta *structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
func (s *handlerUpstreams) watchMeshGateway(ctx context.Context, dc string, upstreamID string) error {
|
||||
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceKind: structs.ServiceKindMeshGateway,
|
||||
UseServiceKind: true,
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
}, "mesh-gateway:"+dc+":"+upstreamID, s.ch)
|
||||
}
|
||||
|
||||
func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
|
||||
s.logger.Trace("initializing watch of target",
|
||||
"upstream", opts.upstreamID,
|
||||
|
Loading…
x
Reference in New Issue
Block a user