mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
Fix up enterprise compatibility for gateways (#7813)
This commit is contained in:
parent
9b363e9f23
commit
c32a4f1ece
@ -1006,23 +1006,23 @@ func (s *Store) maxIndexAndWatchChForService(tx *memdb.Txn, serviceName string,
|
|||||||
|
|
||||||
// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes
|
// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes
|
||||||
func (s *Store) maxIndexAndWatchChsForServiceNodes(tx *memdb.Txn,
|
func (s *Store) maxIndexAndWatchChsForServiceNodes(tx *memdb.Txn,
|
||||||
nodes structs.ServiceNodes, watchChecks bool, entMeta *structs.EnterpriseMeta) (uint64, []<-chan struct{}) {
|
nodes structs.ServiceNodes, watchChecks bool) (uint64, []<-chan struct{}) {
|
||||||
|
|
||||||
var watchChans []<-chan struct{}
|
var watchChans []<-chan struct{}
|
||||||
var maxIdx uint64
|
var maxIdx uint64
|
||||||
|
|
||||||
seen := make(map[string]bool)
|
seen := make(map[structs.ServiceID]bool)
|
||||||
for i := 0; i < len(nodes); i++ {
|
for i := 0; i < len(nodes); i++ {
|
||||||
svc := nodes[i].ServiceName
|
sid := structs.NewServiceID(nodes[i].ServiceName, &nodes[i].EnterpriseMeta)
|
||||||
if ok := seen[svc]; !ok {
|
if ok := seen[sid]; !ok {
|
||||||
idx, svcCh := s.maxIndexAndWatchChForService(tx, svc, true, watchChecks, entMeta)
|
idx, svcCh := s.maxIndexAndWatchChForService(tx, sid.ID, true, watchChecks, &sid.EnterpriseMeta)
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
if svcCh != nil {
|
if svcCh != nil {
|
||||||
watchChans = append(watchChans, svcCh)
|
watchChans = append(watchChans, svcCh)
|
||||||
}
|
}
|
||||||
seen[svc] = true
|
seen[sid] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1078,7 +1078,7 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Watch for index changes to the gateway nodes
|
// Watch for index changes to the gateway nodes
|
||||||
svcIdx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta)
|
svcIdx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false)
|
||||||
if svcIdx > idx {
|
if svcIdx > idx {
|
||||||
idx = svcIdx
|
idx = svcIdx
|
||||||
}
|
}
|
||||||
@ -1098,6 +1098,8 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
|
// TODO (gateways) (freddy) Why do we always consider the main service index here?
|
||||||
|
// This doesn't seem to make sense for Connect when there's more than 1 result
|
||||||
svcIdx := s.maxIndexForService(tx, serviceName, len(results) > 0, false, entMeta)
|
svcIdx := s.maxIndexForService(tx, serviceName, len(results) > 0, false, entMeta)
|
||||||
if idx < svcIdx {
|
if idx < svcIdx {
|
||||||
idx = svcIdx
|
idx = svcIdx
|
||||||
@ -2006,11 +2008,11 @@ func (s *Store) CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string,
|
|||||||
|
|
||||||
// TODO(ingress) : Deal with incorporating index from mapping table
|
// TODO(ingress) : Deal with incorporating index from mapping table
|
||||||
// Watch for index changes to the gateway nodes
|
// Watch for index changes to the gateway nodes
|
||||||
idx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta)
|
idx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false)
|
||||||
maxIdx = lib.MaxUint64(maxIdx, idx)
|
|
||||||
for _, ch := range chans {
|
for _, ch := range chans {
|
||||||
ws.Add(ch)
|
ws.Add(ch)
|
||||||
}
|
}
|
||||||
|
maxIdx = lib.MaxUint64(maxIdx, idx)
|
||||||
|
|
||||||
// TODO(ingress): Test namespace functionality here
|
// TODO(ingress): Test namespace functionality here
|
||||||
// De-dup services to lookup
|
// De-dup services to lookup
|
||||||
@ -2065,11 +2067,13 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
|
|||||||
// service name IFF there is at least one Connect-native instance of that
|
// service name IFF there is at least one Connect-native instance of that
|
||||||
// service. Either way there is usually only one distinct name if proxies are
|
// service. Either way there is usually only one distinct name if proxies are
|
||||||
// named consistently but could be multiple.
|
// named consistently but could be multiple.
|
||||||
serviceNames := make(map[string]struct{}, 2)
|
serviceNames := make(map[structs.ServiceID]struct{}, 2)
|
||||||
for service := iter.Next(); service != nil; service = iter.Next() {
|
for service := iter.Next(); service != nil; service = iter.Next() {
|
||||||
sn := service.(*structs.ServiceNode)
|
sn := service.(*structs.ServiceNode)
|
||||||
results = append(results, sn)
|
results = append(results, sn)
|
||||||
serviceNames[sn.ServiceName] = struct{}{}
|
|
||||||
|
sid := structs.NewServiceID(sn.ServiceName, &sn.EnterpriseMeta)
|
||||||
|
serviceNames[sid] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we are querying for Connect nodes, the associated proxy might be a terminating-gateway.
|
// If we are querying for Connect nodes, the associated proxy might be a terminating-gateway.
|
||||||
@ -2086,7 +2090,9 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
|
|||||||
idx = lib.MaxUint64(idx, gwIdx)
|
idx = lib.MaxUint64(idx, gwIdx)
|
||||||
for i := 0; i < len(nodes); i++ {
|
for i := 0; i < len(nodes); i++ {
|
||||||
results = append(results, nodes[i])
|
results = append(results, nodes[i])
|
||||||
serviceNames[nodes[i].ServiceName] = struct{}{}
|
|
||||||
|
sid := structs.NewServiceID(nodes[i].ServiceName, &nodes[i].EnterpriseMeta)
|
||||||
|
serviceNames[sid] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2109,7 +2115,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
|
|||||||
// We know service values should exist since the serviceNames map is only
|
// We know service values should exist since the serviceNames map is only
|
||||||
// populated if there is at least one result above. so serviceExists arg
|
// populated if there is at least one result above. so serviceExists arg
|
||||||
// below is always true.
|
// below is always true.
|
||||||
svcIdx, svcCh := s.maxIndexAndWatchChForService(tx, svcName, true, true, entMeta)
|
svcIdx, svcCh := s.maxIndexAndWatchChForService(tx, svcName.ID, true, true, &svcName.EnterpriseMeta)
|
||||||
// Take the max index represented
|
// Take the max index represented
|
||||||
idx = lib.MaxUint64(idx, svcIdx)
|
idx = lib.MaxUint64(idx, svcIdx)
|
||||||
if svcCh != nil {
|
if svcCh != nil {
|
||||||
@ -2469,7 +2475,7 @@ func (s *Store) updateGatewayServices(tx *memdb.Txn, idx uint64, conf structs.Co
|
|||||||
var gatewayServices structs.GatewayServices
|
var gatewayServices structs.GatewayServices
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
gatewayID := structs.NewServiceID(conf.GetName(), conf.GetEnterpriseMeta())
|
gatewayID := structs.NewServiceID(conf.GetName(), entMeta)
|
||||||
switch conf.GetKind() {
|
switch conf.GetKind() {
|
||||||
case structs.IngressGateway:
|
case structs.IngressGateway:
|
||||||
gatewayServices, err = s.ingressConfigGatewayServices(tx, gatewayID, conf, entMeta)
|
gatewayServices, err = s.ingressConfigGatewayServices(tx, gatewayID, conf, entMeta)
|
||||||
@ -2742,8 +2748,8 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st
|
|||||||
exists = true
|
exists = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// This prevents the index from sliding back in case all instances of the service are deregistered
|
// This prevents the index from sliding back if case all instances of the gateway service are deregistered
|
||||||
svcIdx := s.maxIndexForService(tx, mapping.Gateway.ID, exists, false, &mapping.Service.EnterpriseMeta)
|
svcIdx := s.maxIndexForService(tx, mapping.Gateway.ID, exists, false, &mapping.Gateway.EnterpriseMeta)
|
||||||
maxIdx = lib.MaxUint64(maxIdx, svcIdx)
|
maxIdx = lib.MaxUint64(maxIdx, svcIdx)
|
||||||
|
|
||||||
// Ensure that blocking queries wake up if the gateway-service mapping exists, but the gateway does not exist yet
|
// Ensure that blocking queries wake up if the gateway-service mapping exists, but the gateway does not exist yet
|
||||||
|
@ -1251,7 +1251,6 @@ func TestStore_ReadDiscoveryChainConfigEntries_SubsetSplit(t *testing.T) {
|
|||||||
require.Len(t, entrySet.Services, 1)
|
require.Len(t, entrySet.Services, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ingress): test that having the same name in different namespace is valid
|
|
||||||
func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) {
|
func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
@ -560,6 +560,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Used in terminating-gateway cases to account for differences in OSS/ent implementations of ServiceID.String()
|
||||||
|
db := structs.NewServiceID("db", nil)
|
||||||
|
dbStr := db.String()
|
||||||
|
|
||||||
cases := map[string]testCase{
|
cases := map[string]testCase{
|
||||||
"initial-gateway": testCase{
|
"initial-gateway": testCase{
|
||||||
ns: structs.NodeService{
|
ns: structs.NodeService{
|
||||||
@ -1002,11 +1006,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
verificationStage{
|
verificationStage{
|
||||||
requiredWatches: map[string]verifyWatchRequest{
|
requiredWatches: map[string]verifyWatchRequest{
|
||||||
"external-service:db": genVerifyServiceWatch("db", "", "dc1", false),
|
"external-service:" + dbStr: genVerifyServiceWatch("db", "", "dc1", false),
|
||||||
},
|
},
|
||||||
events: []cache.UpdateEvent{
|
events: []cache.UpdateEvent{
|
||||||
cache.UpdateEvent{
|
cache.UpdateEvent{
|
||||||
CorrelationID: "external-service:db",
|
CorrelationID: "external-service:" + dbStr,
|
||||||
Result: &structs.IndexedCheckServiceNodes{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: structs.CheckServiceNodes{
|
Nodes: structs.CheckServiceNodes{
|
||||||
{
|
{
|
||||||
@ -1044,11 +1048,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
verificationStage{
|
verificationStage{
|
||||||
requiredWatches: map[string]verifyWatchRequest{
|
requiredWatches: map[string]verifyWatchRequest{
|
||||||
"service-leaf:db": genVerifyLeafWatch("db", "dc1"),
|
"service-leaf:" + dbStr: genVerifyLeafWatch("db", "dc1"),
|
||||||
},
|
},
|
||||||
events: []cache.UpdateEvent{
|
events: []cache.UpdateEvent{
|
||||||
cache.UpdateEvent{
|
cache.UpdateEvent{
|
||||||
CorrelationID: "service-leaf:db",
|
CorrelationID: "service-leaf:" + dbStr,
|
||||||
Result: issuedCert,
|
Result: issuedCert,
|
||||||
Err: nil,
|
Err: nil,
|
||||||
},
|
},
|
||||||
@ -1059,11 +1063,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||||||
},
|
},
|
||||||
verificationStage{
|
verificationStage{
|
||||||
requiredWatches: map[string]verifyWatchRequest{
|
requiredWatches: map[string]verifyWatchRequest{
|
||||||
"service-resolver:db": genVerifyResolverWatch("db", "dc1", structs.ServiceResolver),
|
"service-resolver:" + dbStr: genVerifyResolverWatch("db", "dc1", structs.ServiceResolver),
|
||||||
},
|
},
|
||||||
events: []cache.UpdateEvent{
|
events: []cache.UpdateEvent{
|
||||||
cache.UpdateEvent{
|
cache.UpdateEvent{
|
||||||
CorrelationID: "service-resolver:db",
|
CorrelationID: "service-resolver:" + dbStr,
|
||||||
Result: &structs.IndexedConfigEntries{
|
Result: &structs.IndexedConfigEntries{
|
||||||
Kind: structs.ServiceResolver,
|
Kind: structs.ServiceResolver,
|
||||||
Entries: []structs.ConfigEntry{
|
Entries: []structs.ConfigEntry{
|
||||||
|
@ -89,6 +89,8 @@ func (e *IngressGatewayConfigEntry) Normalize() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
e.Kind = IngressGateway
|
e.Kind = IngressGateway
|
||||||
|
e.EnterpriseMeta.Normalize()
|
||||||
|
|
||||||
for i, listener := range e.Listeners {
|
for i, listener := range e.Listeners {
|
||||||
if listener.Protocol == "" {
|
if listener.Protocol == "" {
|
||||||
listener.Protocol = "tcp"
|
listener.Protocol = "tcp"
|
||||||
@ -96,6 +98,7 @@ func (e *IngressGatewayConfigEntry) Normalize() error {
|
|||||||
|
|
||||||
listener.Protocol = strings.ToLower(listener.Protocol)
|
listener.Protocol = strings.ToLower(listener.Protocol)
|
||||||
for i := range listener.Services {
|
for i := range listener.Services {
|
||||||
|
listener.Services[i].EnterpriseMeta.Merge(&e.EnterpriseMeta)
|
||||||
listener.Services[i].EnterpriseMeta.Normalize()
|
listener.Services[i].EnterpriseMeta.Normalize()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,8 +107,6 @@ func (e *IngressGatewayConfigEntry) Normalize() error {
|
|||||||
e.Listeners[i] = listener
|
e.Listeners[i] = listener
|
||||||
}
|
}
|
||||||
|
|
||||||
e.EnterpriseMeta.Normalize()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,11 +254,12 @@ func (e *TerminatingGatewayConfigEntry) Normalize() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
e.Kind = TerminatingGateway
|
e.Kind = TerminatingGateway
|
||||||
|
e.EnterpriseMeta.Normalize()
|
||||||
|
|
||||||
for i := range e.Services {
|
for i := range e.Services {
|
||||||
|
e.Services[i].EnterpriseMeta.Merge(&e.EnterpriseMeta)
|
||||||
e.Services[i].EnterpriseMeta.Normalize()
|
e.Services[i].EnterpriseMeta.Normalize()
|
||||||
}
|
}
|
||||||
e.EnterpriseMeta.Normalize()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -654,7 +654,8 @@ func (s *Server) sniFilterChainTerminatingGateway(listener, cluster, token strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
// The cluster name here doesn't matter as the sni_cluster filter will fill it in for us.
|
// The cluster name here doesn't matter as the sni_cluster filter will fill it in for us.
|
||||||
tcpProxy, err := makeTCPProxyFilter(listener, "", fmt.Sprintf("terminating_gateway_%s_", service.String()))
|
statPrefix := fmt.Sprintf("terminating_gateway_%s_%s_", service.NamespaceOrDefault(), service.ID)
|
||||||
|
tcpProxy, err := makeTCPProxyFilter(listener, "", statPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return envoylistener.FilterChain{}, err
|
return envoylistener.FilterChain{}, err
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_api_foo_tcp"
|
"stat_prefix": "terminating_gateway_default_api_foo_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -124,7 +124,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_web_foo_tcp"
|
"stat_prefix": "terminating_gateway_default_web_foo_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -214,7 +214,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_api_wan_tcp"
|
"stat_prefix": "terminating_gateway_default_api_wan_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -273,7 +273,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_web_wan_tcp"
|
"stat_prefix": "terminating_gateway_default_web_wan_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
@ -65,7 +65,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_web_default_tcp"
|
"stat_prefix": "terminating_gateway_default_web_default_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
@ -65,7 +65,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_api_default_tcp"
|
"stat_prefix": "terminating_gateway_default_api_default_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -124,7 +124,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_web_default_tcp"
|
"stat_prefix": "terminating_gateway_default_web_default_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -183,7 +183,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_web_default_tcp"
|
"stat_prefix": "terminating_gateway_default_web_default_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -242,7 +242,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_web_default_tcp"
|
"stat_prefix": "terminating_gateway_default_web_default_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
@ -65,7 +65,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_api_default_tcp"
|
"stat_prefix": "terminating_gateway_default_api_default_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@ -124,7 +124,7 @@
|
|||||||
"name": "envoy.tcp_proxy",
|
"name": "envoy.tcp_proxy",
|
||||||
"config": {
|
"config": {
|
||||||
"cluster": "",
|
"cluster": "",
|
||||||
"stat_prefix": "terminating_gateway_web_default_tcp"
|
"stat_prefix": "terminating_gateway_default_web_default_tcp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user