mirror of
https://github.com/status-im/consul.git
synced 2025-01-24 12:40:17 +00:00
Support ingress gateways in mesh viz endpoint (#8864)
Co-authored-by: R.B. Boyer <rb@hashicorp.com>
This commit is contained in:
parent
418fe6ba32
commit
164ce57db2
@ -2542,9 +2542,14 @@ func updateGatewayServices(tx *txn, idx uint64, conf structs.ConfigEntry, entMet
|
||||
}
|
||||
|
||||
// Delete all associated with gateway first, to avoid keeping mappings that were removed
|
||||
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceName(conf.GetName(), entMeta)); err != nil {
|
||||
sn := structs.NewServiceName(conf.GetName(), entMeta)
|
||||
|
||||
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
|
||||
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
||||
}
|
||||
if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil {
|
||||
return fmt.Errorf("failed to truncate mesh topology for gateway: %v", err)
|
||||
}
|
||||
|
||||
for _, svc := range gatewayServices {
|
||||
// If the service is a wildcard we need to target all services within the namespace
|
||||
@ -2734,6 +2739,10 @@ func updateGatewayService(tx *txn, idx uint64, mapping *structs.GatewayService)
|
||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||
}
|
||||
|
||||
if err := insertGatewayServiceTopologyMapping(tx, idx, mapping); err != nil {
|
||||
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -2785,6 +2794,9 @@ func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) erro
|
||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||
}
|
||||
if err := deleteGatewayServiceTopologyMapping(tx, idx, gs); err != nil {
|
||||
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3229,3 +3241,56 @@ func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) erro
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func insertGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error {
|
||||
// Only ingress gateways are standalone items in the mesh topology viz
|
||||
if gs.GatewayKind != structs.ServiceKindIngressGateway || gs.Service.Name == structs.WildcardSpecifier {
|
||||
return nil
|
||||
}
|
||||
|
||||
mapping := structs.UpstreamDownstream{
|
||||
Upstream: gs.Service,
|
||||
Downstream: gs.Gateway,
|
||||
RaftIndex: gs.RaftIndex,
|
||||
}
|
||||
if err := tx.Insert(topologyTableName, &mapping); err != nil {
|
||||
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteGatewayServiceTopologyMapping(tx *txn, idx uint64, gs *structs.GatewayService) error {
|
||||
// Only ingress gateways are standalone items in the mesh topology viz
|
||||
if gs.GatewayKind != structs.ServiceKindIngressGateway {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := tx.DeleteAll(topologyTableName, "id", gs.Service, gs.Gateway); err != nil {
|
||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func truncateGatewayServiceTopologyMappings(tx *txn, idx uint64, gateway structs.ServiceName, kind string) error {
|
||||
// Only ingress gateways are standalone items in the mesh topology viz
|
||||
if kind != string(structs.ServiceKindIngressGateway) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := tx.DeleteAll(topologyTableName, "downstream", gateway); err != nil {
|
||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -6708,6 +6708,218 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
|
||||
require.Empty(t, exp.names)
|
||||
}
|
||||
|
||||
func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
|
||||
type expect struct {
|
||||
idx uint64
|
||||
names []structs.ServiceName
|
||||
}
|
||||
|
||||
s := testStateStore(t)
|
||||
|
||||
require.NoError(t, s.EnsureNode(0, &structs.Node{
|
||||
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
|
||||
Node: "foo",
|
||||
}))
|
||||
require.NoError(t, s.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
}, nil))
|
||||
|
||||
defaultMeta := structs.DefaultEnterpriseMeta()
|
||||
ingress := structs.NewServiceName("ingress", defaultMeta)
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
tx := s.db.ReadTxn()
|
||||
idx, names, err := upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
assert.Zero(t, idx)
|
||||
assert.Len(t, names, 0)
|
||||
|
||||
// Watch should fire since the ingress -> [web, api] mappings were inserted into the topology table
|
||||
require.NoError(t, s.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "api",
|
||||
EnterpriseMeta: *defaultMeta,
|
||||
},
|
||||
{
|
||||
Name: "web",
|
||||
EnterpriseMeta: *defaultMeta,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
|
||||
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
|
||||
exp := expect{
|
||||
idx: 2,
|
||||
names: []structs.ServiceName{
|
||||
{Name: "api", EnterpriseMeta: *defaultMeta},
|
||||
{Name: "web", EnterpriseMeta: *defaultMeta},
|
||||
},
|
||||
}
|
||||
require.Equal(t, exp.idx, idx)
|
||||
require.ElementsMatch(t, exp.names, names)
|
||||
|
||||
// Now delete a gateway service and topology table should be updated
|
||||
require.NoError(t, s.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "api",
|
||||
EnterpriseMeta: *defaultMeta,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
|
||||
exp = expect{
|
||||
// Expect index where the upstream was replaced
|
||||
idx: 3,
|
||||
names: []structs.ServiceName{
|
||||
{Name: "api", EnterpriseMeta: *defaultMeta},
|
||||
},
|
||||
}
|
||||
require.Equal(t, exp.idx, idx)
|
||||
require.ElementsMatch(t, exp.names, names)
|
||||
|
||||
// Now replace api with a wildcard and no services should be returned because none are registered
|
||||
require.NoError(t, s.EnsureConfigEntry(4, &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "*",
|
||||
EnterpriseMeta: *defaultMeta,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(4), idx)
|
||||
require.Len(t, names, 0)
|
||||
|
||||
// Adding a service will be covered by the ingress wildcard and added to the topology
|
||||
svc := structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
Address: "127.0.0.3",
|
||||
Port: 443,
|
||||
EnterpriseMeta: *defaultMeta,
|
||||
}
|
||||
require.NoError(t, s.EnsureService(5, "foo", &svc))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
|
||||
exp = expect{
|
||||
// Expect index where the upstream was replaced
|
||||
idx: 5,
|
||||
names: []structs.ServiceName{
|
||||
{Name: "db", EnterpriseMeta: *defaultMeta},
|
||||
},
|
||||
}
|
||||
require.Equal(t, exp.idx, idx)
|
||||
require.ElementsMatch(t, exp.names, names)
|
||||
|
||||
// Deleting a service covered by a wildcard should delete its mapping
|
||||
require.NoError(t, s.DeleteService(6, "foo", svc.ID, &svc.EnterpriseMeta))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(6), idx)
|
||||
require.Len(t, names, 0)
|
||||
|
||||
// Now add a service again, to test the effect of deleting the config entry itself
|
||||
require.NoError(t, s.EnsureConfigEntry(7, &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "api",
|
||||
EnterpriseMeta: *defaultMeta,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
|
||||
exp = expect{
|
||||
// Expect index where the upstream was replaced
|
||||
idx: 7,
|
||||
names: []structs.ServiceName{
|
||||
{Name: "api", EnterpriseMeta: *defaultMeta},
|
||||
},
|
||||
}
|
||||
require.Equal(t, exp.idx, idx)
|
||||
require.ElementsMatch(t, exp.names, names)
|
||||
|
||||
// Deleting the config entry should remove the mapping
|
||||
require.NoError(t, s.DeleteConfigEntry(8, "ingress-gateway", "ingress", defaultMeta))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
tx = s.db.ReadTxn()
|
||||
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, ingress)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(8), idx)
|
||||
require.Len(t, names, 0)
|
||||
}
|
||||
|
||||
func TestCatalog_DownstreamsForService(t *testing.T) {
|
||||
defaultMeta := structs.DefaultEnterpriseMeta()
|
||||
|
||||
|
@ -265,14 +265,25 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
|
||||
|
||||
// If the config entry is for terminating or ingress gateways we delete entries from the memdb table
|
||||
// that associates gateways <-> services.
|
||||
sn := structs.NewServiceName(name, entMeta)
|
||||
|
||||
if kind == structs.TerminatingGateway || kind == structs.IngressGateway {
|
||||
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)); err != nil {
|
||||
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
|
||||
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||
}
|
||||
}
|
||||
// Also clean up associations in the mesh topology table for ingress gateways
|
||||
if kind == structs.IngressGateway {
|
||||
if _, err := tx.DeleteAll(topologyTableName, "downstream", sn); err != nil {
|
||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
||||
}
|
||||
}
|
||||
|
||||
err = validateProposedConfigEntryInGraph(tx, kind, name, nil, entMeta)
|
||||
if err != nil {
|
||||
|
@ -439,7 +439,7 @@ func prepSummaryOutput(summaries map[structs.ServiceName]*ServiceSummary, exclud
|
||||
sum.ChecksCritical++
|
||||
}
|
||||
}
|
||||
if excludeSidecars && sum.Kind != structs.ServiceKindTypical {
|
||||
if excludeSidecars && sum.Kind != structs.ServiceKindTypical && sum.Kind != structs.ServiceKindIngressGateway {
|
||||
continue
|
||||
}
|
||||
resp = append(resp, sum)
|
||||
|
@ -828,6 +828,7 @@ func TestUIGatewayIntentions(t *testing.T) {
|
||||
|
||||
a := NewTestAgent(t, "")
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForServiceIntentions(t, a.RPC, "dc1")
|
||||
|
||||
// Register terminating gateway and config entry linking it to postgres + redis
|
||||
{
|
||||
@ -940,6 +941,31 @@ func TestUIServiceTopology(t *testing.T) {
|
||||
// Register api -> web -> redis
|
||||
{
|
||||
registrations := map[string]*structs.RegisterRequest{
|
||||
"Node edge": {
|
||||
Datacenter: "dc1",
|
||||
Node: "edge",
|
||||
Address: "127.0.0.20",
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "edge",
|
||||
CheckID: "edge:alive",
|
||||
Name: "edge-liveness",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
},
|
||||
"Ingress gateway on edge": {
|
||||
Datacenter: "dc1",
|
||||
Node: "edge",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindIngressGateway,
|
||||
ID: "ingress",
|
||||
Service: "ingress",
|
||||
Port: 443,
|
||||
Address: "198.18.1.20",
|
||||
},
|
||||
},
|
||||
"Node foo": {
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
@ -1205,7 +1231,8 @@ func TestUIServiceTopology(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Add intentions: deny all, web -> redis with L7 perms, but omit intention for api -> web
|
||||
// Add intentions: deny all, ingress -> api, web -> redis with L7 perms, but omit intention for api -> web
|
||||
// Add ingress config: ingress -> api
|
||||
{
|
||||
entries := []structs.ConfigEntryRequest{
|
||||
{
|
||||
@ -1252,6 +1279,38 @@ func TestUIServiceTopology(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceIntentionsConfigEntry{
|
||||
Kind: structs.ServiceIntentions,
|
||||
Name: "api",
|
||||
Sources: []*structs.SourceIntention{
|
||||
{
|
||||
Name: "ingress",
|
||||
Action: structs.IntentionActionAllow,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "api",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, req := range entries {
|
||||
out := false
|
||||
@ -1259,6 +1318,45 @@ func TestUIServiceTopology(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("ingress", func(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Request topology for ingress
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.UIServiceTopology(resp, req)
|
||||
assert.Nil(r, err)
|
||||
require.NoError(r, checkIndex(resp))
|
||||
|
||||
expect := ServiceTopology{
|
||||
Upstreams: []*ServiceTopologySummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
Name: "api",
|
||||
Datacenter: "dc1",
|
||||
Nodes: []string{"foo"},
|
||||
InstanceCount: 1,
|
||||
ChecksPassing: 3,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
},
|
||||
Intention: structs.IntentionDecisionSummary{
|
||||
Allowed: true,
|
||||
HasPermissions: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
FilteredByACLs: false,
|
||||
}
|
||||
result := obj.(ServiceTopology)
|
||||
|
||||
// Internal accounting that is not returned in JSON response
|
||||
for _, u := range result.Upstreams {
|
||||
u.externalSourceSet = nil
|
||||
u.checks = nil
|
||||
}
|
||||
require.Equal(r, expect, result)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("api", func(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Request topology for api
|
||||
@ -1269,6 +1367,23 @@ func TestUIServiceTopology(t *testing.T) {
|
||||
require.NoError(r, checkIndex(resp))
|
||||
|
||||
expect := ServiceTopology{
|
||||
Downstreams: []*ServiceTopologySummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
Name: "ingress",
|
||||
Kind: structs.ServiceKindIngressGateway,
|
||||
Datacenter: "dc1",
|
||||
Nodes: []string{"edge"},
|
||||
InstanceCount: 1,
|
||||
ChecksPassing: 1,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
},
|
||||
Intention: structs.IntentionDecisionSummary{
|
||||
Allowed: true,
|
||||
HasPermissions: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
Upstreams: []*ServiceTopologySummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
@ -1297,6 +1412,10 @@ func TestUIServiceTopology(t *testing.T) {
|
||||
u.externalSourceSet = nil
|
||||
u.checks = nil
|
||||
}
|
||||
for _, d := range result.Downstreams {
|
||||
d.externalSourceSet = nil
|
||||
d.checks = nil
|
||||
}
|
||||
require.Equal(r, expect, result)
|
||||
})
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user