diff --git a/agent/cache-types/service_dump.go b/agent/cache-types/service_dump.go index 6bb633b77f..88ea1ed68b 100644 --- a/agent/cache-types/service_dump.go +++ b/agent/cache-types/service_dump.go @@ -41,7 +41,7 @@ func (c *InternalServiceDump) Fetch(opts cache.FetchOptions, req cache.Request) reqReal.AllowStale = true // Fetch - var reply structs.IndexedCheckServiceNodes + var reply structs.IndexedNodesWithGateways if err := c.RPC.RPC("Internal.ServiceDump", reqReal, &reply); err != nil { return result, err } diff --git a/agent/cache-types/service_dump_test.go b/agent/cache-types/service_dump_test.go index 4c355e13f6..0f49c965fa 100644 --- a/agent/cache-types/service_dump_test.go +++ b/agent/cache-types/service_dump_test.go @@ -16,7 +16,7 @@ func TestInternalServiceDump(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. - var resp *structs.IndexedCheckServiceNodes + var resp *structs.IndexedNodesWithGateways rpc.On("RPC", "Internal.ServiceDump", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { req := args.Get(1).(*structs.ServiceDumpRequest) @@ -24,7 +24,7 @@ func TestInternalServiceDump(t *testing.T) { require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedCheckServiceNodes) + reply := args.Get(2).(*structs.IndexedNodesWithGateways) reply.Nodes = []structs.CheckServiceNode{ {Service: &structs.NodeService{Kind: req.ServiceKind, Service: "foo"}}, } diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 63ae9ed299..3f60b1015e 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -88,7 +88,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, }) } -func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedCheckServiceNodes) error { +func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedNodesWithGateways) error { if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, args, reply); done { return err } @@ -107,13 +107,30 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - index, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta) + // Get, store, and filter nodes + maxIdx, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta) if err != nil { return err } + reply.Nodes = nodes - reply.Index, reply.Nodes = index, nodes - if err := m.srv.filterACL(args.Token, reply); err != nil { + if err := m.srv.filterACL(args.Token, &reply.Nodes); err != nil { + return err + } + + // Get, store, and filter gateway services + idx, gatewayServices, err := state.DumpGatewayServices(ws) + if err != nil { + return err + } + reply.Gateways = gatewayServices + + if idx > maxIdx { + maxIdx = idx + } + reply.Index = maxIdx + + if err := m.srv.filterACL(args.Token, &reply.Gateways); err != nil { return err } diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 6ca847c22c..da929f7e5a 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -568,35 +568,83 @@ func TestInternal_ServiceDump(t *testing.T) { // prep the cluster with some data we can use in our filters registerTestCatalogEntries(t, codec) - doRequest := func(t *testing.T, filter string) structs.CheckServiceNodes { + // Register a gateway config entry to ensure gateway-services is dumped + { + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: &structs.TerminatingGatewayConfigEntry{ + Name: "terminating-gateway", + Kind: structs.TerminatingGateway, + Services: []structs.LinkedService{ + { + Name: "api", + }, + { + Name: "cache", + }, + }, + }, + } + var configOutput bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + } + + doRequest := func(t *testing.T, filter string) structs.IndexedNodesWithGateways { t.Helper() args := structs.DCSpecificRequest{ Datacenter: "dc1", QueryOptions: structs.QueryOptions{Filter: filter}, } - var out structs.IndexedCheckServiceNodes + var out structs.IndexedNodesWithGateways require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out)) - return out.Nodes + + // The GatewayServices dump is currently cannot be bexpr filtered + // so the response should be the same in all subtests + expectedGW := structs.GatewayServices{ + { + Service: structs.ServiceName{Name: "api"}, + Gateway: structs.ServiceName{Name: "terminating-gateway"}, + GatewayKind: structs.ServiceKindTerminatingGateway, + }, + { + Service: structs.ServiceName{Name: "cache"}, + Gateway: structs.ServiceName{Name: "terminating-gateway"}, + GatewayKind: structs.ServiceKindTerminatingGateway, + }, + } + assert.Len(t, out.Gateways, 2) + assert.Equal(t, expectedGW[0].Service, out.Gateways[0].Service) + assert.Equal(t, expectedGW[0].Gateway, out.Gateways[0].Gateway) + assert.Equal(t, expectedGW[0].GatewayKind, out.Gateways[0].GatewayKind) + + assert.Equal(t, expectedGW[1].Service, out.Gateways[1].Service) + assert.Equal(t, expectedGW[1].Gateway, out.Gateways[1].Gateway) + assert.Equal(t, expectedGW[1].GatewayKind, out.Gateways[1].GatewayKind) + + return out } // Run the tests against the test server t.Run("No Filter", func(t *testing.T) { nodes := doRequest(t, "") // redis (3), web (3), critical (1), warning (1) and consul (1) - require.Len(t, nodes, 9) + require.Len(t, nodes.Nodes, 9) + }) t.Run("Filter Node foo and service version 1", func(t *testing.T) { - nodes := doRequest(t, "Node.Node == foo and Service.Meta.version == 1") - require.Len(t, nodes, 1) - require.Equal(t, "redis", nodes[0].Service.Service) - require.Equal(t, "redisV1", nodes[0].Service.ID) + resp := doRequest(t, "Node.Node == foo and Service.Meta.version == 1") + require.Len(t, resp.Nodes, 1) + require.Equal(t, "redis", resp.Nodes[0].Service.Service) + require.Equal(t, "redisV1", resp.Nodes[0].Service.ID) }) t.Run("Filter service web", func(t *testing.T) { - nodes := doRequest(t, "Service.Service == web") - require.Len(t, nodes, 3) + resp := doRequest(t, "Service.Service == web") + require.Len(t, resp.Nodes, 3) }) } @@ -622,7 +670,7 @@ func TestInternal_ServiceDump_Kind(t *testing.T) { UseServiceKind: true, } - var out structs.IndexedCheckServiceNodes + var out structs.IndexedNodesWithGateways require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out)) return out.Nodes } diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 7edc6431e1..d68082ac0c 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2194,30 +2194,19 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *structs.EnterpriseMeta) (uint64, structs.GatewayServices, error) { tx := s.db.Txn(false) defer tx.Abort() + iter, err := gatewayServices(tx, gateway, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed gateway services lookup: %s", err) } ws.Add(iter.WatchCh()) - var maxIdx uint64 - var results structs.GatewayServices - for service := iter.Next(); service != nil; service = iter.Next() { - svc := service.(*structs.GatewayService) - - if svc.Service.Name != structs.WildcardSpecifier { - idx, matches, err := checkProtocolMatch(tx, ws, svc) - if err != nil { - return 0, nil, fmt.Errorf("failed checking protocol: %s", err) - } - maxIdx = lib.MaxUint64(maxIdx, idx) - if matches { - results = append(results, svc) - } - } + maxIdx, results, err := s.collectGatewayServices(tx, ws, iter) + if err != nil { + return 0, nil, err } - idx := maxIndexTxn(tx, gatewayServicesTableName) + return lib.MaxUint64(maxIdx, idx), results, nil } @@ -2723,6 +2712,48 @@ func gatewayServices(tx *txn, name string, entMeta *structs.EnterpriseMeta) (mem return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)) } +func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayServices, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + iter, err := tx.Get(gatewayServicesTableName, "id") + if err != nil { + return 0, nil, fmt.Errorf("failed to dump gateway-services: %s", err) + } + ws.Add(iter.WatchCh()) + + maxIdx, results, err := s.collectGatewayServices(tx, ws, iter) + if err != nil { + return 0, nil, err + } + idx := maxIndexTxn(tx, gatewayServicesTableName) + + return lib.MaxUint64(maxIdx, idx), results, nil +} + +func (s *Store) collectGatewayServices(tx *txn, ws memdb.WatchSet, iter memdb.ResultIterator) (uint64, structs.GatewayServices, error) { + var maxIdx uint64 + var results structs.GatewayServices + + for obj := iter.Next(); obj != nil; obj = iter.Next() { + gs := obj.(*structs.GatewayService) + maxIdx = lib.MaxUint64(maxIdx, gs.ModifyIndex) + + if gs.Service.Name != structs.WildcardSpecifier { + idx, matches, err := checkProtocolMatch(tx, ws, gs) + if err != nil { + return 0, nil, fmt.Errorf("failed checking protocol: %s", err) + } + maxIdx = lib.MaxUint64(maxIdx, idx) + + if matches { + results = append(results, gs) + } + } + } + return maxIdx, results, nil +} + // TODO(ingress): How to handle index rolling back when a config entry is // deleted that references a service? // We might need something like the service_last_extinction index? diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 183de2e80b..ca7340adde 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -5702,3 +5702,415 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { return ws } + +func TestStateStore_DumpGatewayServices(t *testing.T) { + s := testStateStore(t) + + // Listing with no results returns an empty list. + ws := memdb.NewWatchSet() + idx, nodes, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(0)) + assert.Len(t, nodes, 0) + + // Create some nodes + assert.Nil(t, s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + assert.Nil(t, s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"})) + assert.Nil(t, s.EnsureNode(12, &structs.Node{Node: "baz", Address: "127.0.0.2"})) + + // Typical services and some consul services spread across two nodes + assert.Nil(t, s.EnsureService(13, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) + assert.Nil(t, s.EnsureService(15, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) + assert.Nil(t, s.EnsureService(16, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + + ingressNS := &structs.NodeService{ + Kind: structs.ServiceKindIngressGateway, + ID: "ingress", + Service: "ingress", + Port: 8443, + } + assert.Nil(t, s.EnsureService(18, "baz", ingressNS)) + + // Register a gateway + terminatingNS := &structs.NodeService{ + Kind: structs.ServiceKindTerminatingGateway, + ID: "gateway", + Service: "gateway", + Port: 443, + } + assert.Nil(t, s.EnsureService(20, "baz", terminatingNS)) + + t.Run("add-tgw-config", func(t *testing.T) { + // Associate gateway with db and api + assert.Nil(t, s.EnsureConfigEntry(21, &structs.TerminatingGatewayConfigEntry{ + Kind: "terminating-gateway", + Name: "gateway", + Services: []structs.LinkedService{ + { + Name: "api", + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + }, + { + Name: "db", + }, + { + Name: "*", + CAFile: "ca.crt", + CertFile: "client.crt", + KeyFile: "client.key", + SNI: "my-alt-domain", + }, + }, + }, nil)) + assert.True(t, watchFired(ws)) + + // Read everything back. + ws = memdb.NewWatchSet() + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(21)) + assert.Len(t, out, 2) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceName("api", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + { + Service: structs.NewServiceName("db", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + } + assert.Equal(t, expect, out) + }) + + t.Run("no-op", func(t *testing.T) { + // Check watch doesn't fire on same exact config + assert.Nil(t, s.EnsureConfigEntry(21, &structs.TerminatingGatewayConfigEntry{ + Kind: "terminating-gateway", + Name: "gateway", + Services: []structs.LinkedService{ + { + Name: "api", + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + }, + { + Name: "db", + }, + { + Name: "*", + CAFile: "ca.crt", + CertFile: "client.crt", + KeyFile: "client.key", + SNI: "my-alt-domain", + }, + }, + }, nil)) + assert.False(t, watchFired(ws)) + + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(21)) + assert.Len(t, out, 2) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceName("api", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + { + Service: structs.NewServiceName("db", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + } + assert.Equal(t, expect, out) + }) + + // Add a service covered by wildcard + t.Run("add-wc-service", func(t *testing.T) { + assert.Nil(t, s.EnsureService(22, "bar", &structs.NodeService{ID: "redis", Service: "redis", Tags: nil, Address: "", Port: 6379})) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(22)) + assert.Len(t, out, 3) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceName("api", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + { + Service: structs.NewServiceName("db", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + { + Service: structs.NewServiceName("redis", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "ca.crt", + CertFile: "client.crt", + KeyFile: "client.key", + SNI: "my-alt-domain", + FromWildcard: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 22, + ModifyIndex: 22, + }, + }, + } + assert.Equal(t, expect, out) + }) + + // Delete a service covered by wildcard + t.Run("delete-wc-service", func(t *testing.T) { + assert.Nil(t, s.DeleteService(23, "bar", "redis", nil)) + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(23)) + assert.Len(t, out, 2) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceName("api", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + { + Service: structs.NewServiceName("db", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + RaftIndex: structs.RaftIndex{ + CreateIndex: 21, + ModifyIndex: 21, + }, + }, + } + assert.Equal(t, expect, out) + }) + + t.Run("delete-config-entry-svc", func(t *testing.T) { + // Update the entry that only leaves one service + assert.Nil(t, s.EnsureConfigEntry(24, &structs.TerminatingGatewayConfigEntry{ + Kind: "terminating-gateway", + Name: "gateway", + Services: []structs.LinkedService{ + { + Name: "db", + }, + }, + }, nil)) + assert.True(t, watchFired(ws)) + + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(24)) + assert.Len(t, out, 1) + + // previously associated service (api) should not be present + expect := structs.GatewayServices{ + { + Service: structs.NewServiceName("db", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + RaftIndex: structs.RaftIndex{ + CreateIndex: 24, + ModifyIndex: 24, + }, + }, + } + assert.Equal(t, expect, out) + }) + + t.Run("add-ingress-config", func(t *testing.T) { + svcDefault := &structs.ServiceConfigEntry{ + Name: "web", + Kind: structs.ServiceDefaults, + Protocol: "http", + } + assert.NoError(t, s.EnsureConfigEntry(25, svcDefault, nil)) + + // Associate gateway with db and api + assert.Nil(t, s.EnsureConfigEntry(26, &structs.IngressGatewayConfigEntry{ + Kind: "ingress-gateway", + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "api", + }, + }, + }, + { + Port: 2222, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "web", + Hosts: []string{"web.example.com"}, + }, + }, + }, + }, + }, nil)) + assert.True(t, watchFired(ws)) + + // Read everything back. + ws = memdb.NewWatchSet() + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(26)) + assert.Len(t, out, 3) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceName("db", nil), + Gateway: structs.NewServiceName("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + RaftIndex: structs.RaftIndex{ + CreateIndex: 24, + ModifyIndex: 24, + }, + }, + { + Service: structs.NewServiceName("api", nil), + Gateway: structs.NewServiceName("ingress", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Protocol: "tcp", + Port: 1111, + RaftIndex: structs.RaftIndex{ + CreateIndex: 26, + ModifyIndex: 26, + }, + }, + { + Service: structs.NewServiceName("web", nil), + Gateway: structs.NewServiceName("ingress", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Protocol: "http", + Port: 2222, + Hosts: []string{"web.example.com"}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 26, + ModifyIndex: 26, + }, + }, + } + assert.Equal(t, expect, out) + }) + + t.Run("delete-tgw-entry", func(t *testing.T) { + // Deleting the config entry should remove existing mappings + assert.Nil(t, s.DeleteConfigEntry(27, "terminating-gateway", "gateway", nil)) + assert.True(t, watchFired(ws)) + + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(27)) + assert.Len(t, out, 2) + + // Only ingress entries should remain + expect := structs.GatewayServices{ + { + Service: structs.NewServiceName("api", nil), + Gateway: structs.NewServiceName("ingress", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Protocol: "tcp", + Port: 1111, + RaftIndex: structs.RaftIndex{ + CreateIndex: 26, + ModifyIndex: 26, + }, + }, + { + Service: structs.NewServiceName("web", nil), + Gateway: structs.NewServiceName("ingress", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Protocol: "http", + Port: 2222, + Hosts: []string{"web.example.com"}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 26, + ModifyIndex: 26, + }, + }, + } + assert.Equal(t, expect, out) + }) + + t.Run("delete-ingress-entry", func(t *testing.T) { + // Deleting the config entry should remove existing mappings + assert.Nil(t, s.DeleteConfigEntry(28, "ingress-gateway", "ingress", nil)) + assert.True(t, watchFired(ws)) + + idx, out, err := s.DumpGatewayServices(ws) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(28)) + assert.Len(t, out, 0) + }) +} diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index d9a826a1b6..703cb66afc 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -773,7 +773,7 @@ func (s *state) handleUpdateUpstreams(u cache.UpdateEvent, snap *ConfigSnapshotU snap.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"): - resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) + resp, ok := u.Result.(*structs.IndexedNodesWithGateways) if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } @@ -1378,7 +1378,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho delete(snap.MeshGateway.ServiceGroups, sn) } case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"): - resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) + resp, ok := u.Result.(*structs.IndexedNodesWithGateways) if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index a907144f84..19a0ed22b4 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -782,7 +782,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { events: []cache.UpdateEvent{ { CorrelationID: "mesh-gateway:dc4", - Result: &structs.IndexedCheckServiceNodes{ + Result: &structs.IndexedNodesWithGateways{ Nodes: TestGatewayNodesDC4Hostname(t), }, Err: nil, diff --git a/agent/structs/structs.go b/agent/structs/structs.go index d6fb989589..907d4ef95e 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1777,8 +1777,16 @@ func (n *ServiceName) Matches(o *ServiceName) bool { return true } -func (si *ServiceName) ToServiceID() ServiceID { - return ServiceID{ID: si.Name, EnterpriseMeta: si.EnterpriseMeta} +func (n *ServiceName) ToServiceID() ServiceID { + return ServiceID{ID: n.Name, EnterpriseMeta: n.EnterpriseMeta} +} + +func (n *ServiceName) LessThan(other *ServiceName) bool { + if n.EnterpriseMeta.LessThan(&other.EnterpriseMeta) { + return true + } + + return n.Name < other.Name } type ServiceList []ServiceName @@ -1815,6 +1823,12 @@ type IndexedCheckServiceNodes struct { QueryMeta } +type IndexedNodesWithGateways struct { + Nodes CheckServiceNodes + Gateways GatewayServices + QueryMeta +} + type DatacenterIndexedCheckServiceNodes struct { DatacenterNodes map[string]CheckServiceNodes QueryMeta diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index 91c76e6c95..277a2b190e 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -17,26 +17,27 @@ import ( const metaExternalSource = "external-source" type GatewayConfig struct { - Addresses []string `json:",omitempty"` + AssociatedServiceCount int `json:",omitempty"` + Addresses []string `json:",omitempty"` // internal to track uniqueness addressesSet map[string]struct{} } // ServiceSummary is used to summarize a service type ServiceSummary struct { - Kind structs.ServiceKind `json:",omitempty"` - Name string - Tags []string - Nodes []string - InstanceCount int - ProxyFor []string `json:",omitempty"` - proxyForSet map[string]struct{} // internal to track uniqueness - ChecksPassing int - ChecksWarning int - ChecksCritical int - ExternalSources []string - externalSourceSet map[string]struct{} // internal to track uniqueness - GatewayConfig GatewayConfig `json:",omitempty"` + Kind structs.ServiceKind `json:",omitempty"` + Name string + Tags []string + Nodes []string + InstanceCount int + ChecksPassing int + ChecksWarning int + ChecksCritical int + ExternalSources []string + externalSourceSet map[string]struct{} // internal to track uniqueness + GatewayConfig GatewayConfig `json:",omitempty"` + ConnectedWithProxy bool + ConnectedWithGateway bool structs.EnterpriseMeta } @@ -150,7 +151,7 @@ func (s *HTTPHandlers) UIServices(resp http.ResponseWriter, req *http.Request) ( s.parseFilter(req, &args.Filter) // Make the RPC request - var out structs.IndexedCheckServiceNodes + var out structs.IndexedNodesWithGateways defer setMeta(resp, &out.QueryMeta) RPC: if err := s.agent.RPC("Internal.ServiceDump", &args, &out); err != nil { @@ -164,7 +165,7 @@ RPC: // Generate the summary // TODO (gateways) (freddy) Have Internal.ServiceDump return ServiceDump instead. Need to add bexpr filtering for type. - return summarizeServices(out.Nodes.ToServiceDump(), s.agent.config, args.Datacenter), nil + return summarizeServices(out.Nodes.ToServiceDump(), out.Gateways, s.agent.config, args.Datacenter), nil } // UIGatewayServices is used to query all the nodes for services associated with a gateway along with their gateway config @@ -199,18 +200,23 @@ RPC: return nil, err } - return summarizeServices(out.Dump, s.agent.config, args.Datacenter), nil + return summarizeServices(out.Dump, nil, s.agent.config, args.Datacenter), nil } -func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, datacenter string) []*ServiceSummary { +// TODO (freddy): Refactor to split up for the two use cases +func summarizeServices(dump structs.ServiceDump, gateways structs.GatewayServices, cfg *config.RuntimeConfig, dc string) []*ServiceSummary { // Collect the summary information - var services []structs.ServiceID - summary := make(map[structs.ServiceID]*ServiceSummary) - getService := func(service structs.ServiceID) *ServiceSummary { + var services []structs.ServiceName + summary := make(map[structs.ServiceName]*ServiceSummary) + + linkedGateways := make(map[structs.ServiceName][]structs.ServiceName) + hasProxy := make(map[structs.ServiceName]bool) + + getService := func(service structs.ServiceName) *ServiceSummary { serv, ok := summary[service] if !ok { serv = &ServiceSummary{ - Name: service.ID, + Name: service.Name, EnterpriseMeta: service.EnterpriseMeta, // the other code will increment this unconditionally so we // shouldn't initialize it to 1 @@ -222,18 +228,27 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, data return serv } + // Collect the list of services linked to each gateway up front + // THis also allows tracking whether a service name is associated with a gateway + gsCount := make(map[structs.ServiceName]int) + + for _, gs := range gateways { + gsCount[gs.Gateway] += 1 + linkedGateways[gs.Service] = append(linkedGateways[gs.Service], gs.Gateway) + } + for _, csn := range dump { if csn.GatewayService != nil { gwsvc := csn.GatewayService - sum := getService(gwsvc.Service.ToServiceID()) - modifySummaryForGatewayService(cfg, datacenter, sum, gwsvc) + sum := getService(gwsvc.Service) + modifySummaryForGatewayService(cfg, dc, sum, gwsvc) } // Will happen in cases where we only have the GatewayServices mapping if csn.Service == nil { continue } - sid := structs.NewServiceID(csn.Service.Service, &csn.Service.EnterpriseMeta) + sid := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta) sum := getService(sid) svc := csn.Service @@ -241,13 +256,7 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, data sum.Kind = svc.Kind sum.InstanceCount += 1 if svc.Kind == structs.ServiceKindConnectProxy { - if _, ok := sum.proxyForSet[svc.Proxy.DestinationServiceName]; !ok { - if sum.proxyForSet == nil { - sum.proxyForSet = make(map[string]struct{}) - } - sum.proxyForSet[svc.Proxy.DestinationServiceName] = struct{}{} - sum.ProxyFor = append(sum.ProxyFor, svc.Proxy.DestinationServiceName) - } + hasProxy[structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)] = true } for _, tag := range svc.Tags { found := false @@ -294,10 +303,23 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, data sort.Slice(services, func(i, j int) bool { return services[i].LessThan(&services[j]) }) + output := make([]*ServiceSummary, len(summary)) for idx, service := range services { - // Sort the nodes and tags sum := summary[service] + if hasProxy[service] { + sum.ConnectedWithProxy = true + } + + // Verify that at least one of the gateways linked by config entry has an instance registered in the catalog + for _, gw := range linkedGateways[service] { + if s := summary[gw]; s != nil && s.InstanceCount > 0 { + sum.ConnectedWithGateway = true + } + } + sum.GatewayConfig.AssociatedServiceCount = gsCount[service] + + // Sort the nodes and tags sort.Strings(sum.Nodes) sort.Strings(sum.Tags) output[idx] = sum diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index 35b1ae191e..5291398cdb 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -297,6 +297,64 @@ func TestUiServices(t *testing.T) { require.NoError(t, a.RPC("Catalog.Register", args, &out)) } + // Register a terminating gateway associated with api and cache + { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "terminating-gateway", + Service: "terminating-gateway", + Kind: structs.ServiceKindTerminatingGateway, + Port: 443, + }, + } + var regOutput struct{} + require.NoError(t, a.RPC("Catalog.Register", &arg, ®Output)) + + args := &structs.TerminatingGatewayConfigEntry{ + Name: "terminating-gateway", + Kind: structs.TerminatingGateway, + Services: []structs.LinkedService{ + { + Name: "api", + }, + { + Name: "cache", + }, + }, + } + + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + } + var configOutput bool + require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + + // Web should not show up as ConnectedWithGateway since this one does not have any instances + args = &structs.TerminatingGatewayConfigEntry{ + Name: "other-terminating-gateway", + Kind: structs.TerminatingGateway, + Services: []structs.LinkedService{ + { + Name: "web", + }, + }, + } + + req = structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + } + require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + } + t.Run("No Filter", func(t *testing.T) { t.Parallel() req, _ := http.NewRequest("GET", "/v1/internal/ui/services/dc1", nil) @@ -307,36 +365,38 @@ func TestUiServices(t *testing.T) { // Should be 2 nodes, and all the empty lists should be non-nil summary := obj.([]*ServiceSummary) - require.Len(t, summary, 4) + require.Len(t, summary, 5) // internal accounting that users don't see can be blown away for _, sum := range summary { sum.externalSourceSet = nil - sum.proxyForSet = nil } expected := []*ServiceSummary{ { - Kind: structs.ServiceKindTypical, - Name: "api", - Tags: []string{"tag1", "tag2"}, - Nodes: []string{"foo"}, - InstanceCount: 1, - ChecksPassing: 2, - ChecksWarning: 1, - ChecksCritical: 0, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + Kind: structs.ServiceKindTypical, + Name: "api", + Tags: []string{"tag1", "tag2"}, + Nodes: []string{"foo"}, + InstanceCount: 1, + ChecksPassing: 2, + ChecksWarning: 1, + ChecksCritical: 0, + ConnectedWithProxy: true, + ConnectedWithGateway: true, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), }, { - Kind: structs.ServiceKindTypical, - Name: "cache", - Tags: nil, - Nodes: []string{"zip"}, - InstanceCount: 1, - ChecksPassing: 0, - ChecksWarning: 0, - ChecksCritical: 0, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + Kind: structs.ServiceKindTypical, + Name: "cache", + Tags: nil, + Nodes: []string{"zip"}, + InstanceCount: 1, + ChecksPassing: 0, + ChecksWarning: 0, + ChecksCritical: 0, + ConnectedWithGateway: true, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), }, { Kind: structs.ServiceKindConnectProxy, @@ -344,7 +404,6 @@ func TestUiServices(t *testing.T) { Tags: nil, Nodes: []string{"bar", "foo"}, InstanceCount: 2, - ProxyFor: []string{"api"}, ChecksPassing: 2, ChecksWarning: 1, ChecksCritical: 1, @@ -362,7 +421,19 @@ func TestUiServices(t *testing.T) { ChecksCritical: 0, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), }, + { + Kind: structs.ServiceKindTerminatingGateway, + Name: "terminating-gateway", + Tags: nil, + Nodes: []string{"foo"}, + InstanceCount: 1, + ChecksPassing: 2, + ChecksWarning: 1, + GatewayConfig: GatewayConfig{AssociatedServiceCount: 2}, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, } + require.ElementsMatch(t, expected, summary) }) @@ -381,20 +452,21 @@ func TestUiServices(t *testing.T) { // internal accounting that users don't see can be blown away for _, sum := range summary { sum.externalSourceSet = nil - sum.proxyForSet = nil } expected := []*ServiceSummary{ { - Kind: structs.ServiceKindTypical, - Name: "api", - Tags: []string{"tag1", "tag2"}, - Nodes: []string{"foo"}, - InstanceCount: 1, - ChecksPassing: 2, - ChecksWarning: 1, - ChecksCritical: 0, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + Kind: structs.ServiceKindTypical, + Name: "api", + Tags: []string{"tag1", "tag2"}, + Nodes: []string{"foo"}, + InstanceCount: 1, + ChecksPassing: 2, + ChecksWarning: 1, + ChecksCritical: 0, + ConnectedWithProxy: true, + ConnectedWithGateway: false, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), }, { Kind: structs.ServiceKindConnectProxy, @@ -402,7 +474,6 @@ func TestUiServices(t *testing.T) { Tags: nil, Nodes: []string{"bar", "foo"}, InstanceCount: 2, - ProxyFor: []string{"api"}, ChecksPassing: 2, ChecksWarning: 1, ChecksCritical: 1,