diff --git a/agent/agent.go b/agent/agent.go index fbb21d7e06..86f0e6c5c8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3937,6 +3937,14 @@ func (a *Agent) registerCache() { }, &cache.RegisterOptions{ Refresh: false, }) + + a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{ + RPC: a, + }, &cache.RegisterOptions{ + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) } // defaultProxyCommand returns the default Connect managed proxy command. diff --git a/agent/cache-types/service_dump.go b/agent/cache-types/service_dump.go new file mode 100644 index 0000000000..868c534e7a --- /dev/null +++ b/agent/cache-types/service_dump.go @@ -0,0 +1,51 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const InternalServiceDumpName = "service-dump" + +// InternalServiceDump supports fetching discovering service names via the catalog. +type InternalServiceDump struct { + RPC RPC +} + +func (c *InternalServiceDump) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a ServiceDumpRequest. + reqReal, ok := req.(*structs.ServiceDumpRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Always allow stale - there's no point in hitting leader if the request is + // going to be served from cache and end up arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.IndexedCheckServiceNodes + if err := c.RPC.RPC("Internal.ServiceDump", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} + +func (c *InternalServiceDump) SupportsBlocking() bool { + return true +} diff --git a/agent/cache-types/service_dump_test.go b/agent/cache-types/service_dump_test.go new file mode 100644 index 0000000000..4c355e13f6 --- /dev/null +++ b/agent/cache-types/service_dump_test.go @@ -0,0 +1,63 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestInternalServiceDump(t *testing.T) { + rpc := TestRPC(t) + typ := &InternalServiceDump{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.IndexedCheckServiceNodes + rpc.On("RPC", "Internal.ServiceDump", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.ServiceDumpRequest) + require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) + require.True(t, req.AllowStale) + + reply := args.Get(2).(*structs.IndexedCheckServiceNodes) + reply.Nodes = []structs.CheckServiceNode{ + {Service: &structs.NodeService{Kind: req.ServiceKind, Service: "foo"}}, + } + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.ServiceDumpRequest{ + Datacenter: "dc1", + ServiceKind: structs.ServiceKindMeshGateway, + UseServiceKind: true, + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) + + rpc.AssertExpectations(t) +} + +func TestInternalServiceDump_badReqType(t *testing.T) { + rpc := TestRPC(t) + typ := &CatalogServices{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") + rpc.AssertExpectations(t) +} diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 678ef033fd..87037cf594 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -493,6 +493,49 @@ func registerTestCatalogEntries(t *testing.T, codec rpc.ClientCodec) { }, } + registerTestCatalogEntriesMap(t, codec, registrations) +} + +func registerTestCatalogEntries2(t *testing.T, codec rpc.ClientCodec) { + t.Helper() + + registrations := map[string]*structs.RegisterRequest{ + "Service mg-gw": &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "gateway", + ID: types.NodeID("72e18a4c-85ec-4520-978f-2fc0378b06aa"), + Address: "10.1.2.3", + Service: &structs.NodeService{ + Kind: structs.ServiceKindMeshGateway, + ID: "mg-gw-01", + Service: "mg-gw", + Port: 8443, + Address: "198.18.1.4", + }, + }, + "Service rproxy": &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "proxy", + ID: types.NodeID("2d31602c-3291-4f94-842d-446bc2f945ce"), + Address: "10.1.2.4", + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Port: 8443, + Address: "198.18.1.5", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + }, + }, + }, + } + + registerTestCatalogEntriesMap(t, codec, registrations) +} + +func registerTestCatalogEntriesMap(t *testing.T, codec rpc.ClientCodec, registrations map[string]*structs.RegisterRequest) { + t.Helper() for name, reg := range registrations { err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil) require.NoError(t, err, "Failed catalog registration %q: %v", name, err) diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 9557ecbe32..a5ec9874e2 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -76,7 +76,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, }) } -func (m *Internal) ServiceDump(args *structs.DCSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { +func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedCheckServiceNodes) error { if done, err := m.srv.forward("Internal.ServiceDump", args, args, reply); done { return err } @@ -90,7 +90,7 @@ func (m *Internal) ServiceDump(args *structs.DCSpecificRequest, reply *structs.I &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - index, nodes, err := state.ServiceDump(ws) + index, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind) if err != nil { return err } diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index eee9569f43..578d70e4f2 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -483,3 +483,52 @@ func TestInternal_ServiceDump(t *testing.T) { require.Len(t, nodes, 3) }) } + +func TestInternal_ServiceDump_Kind(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // prep the cluster with some data we can use in our filters + registerTestCatalogEntries(t, codec) + registerTestCatalogEntries2(t, codec) + + doRequest := func(t *testing.T, kind structs.ServiceKind) structs.CheckServiceNodes { + t.Helper() + args := structs.ServiceDumpRequest{ + Datacenter: "dc1", + ServiceKind: kind, + UseServiceKind: true, + } + + var out structs.IndexedCheckServiceNodes + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out)) + return out.Nodes + } + + // Run the tests against the test server + t.Run("Typical", func(t *testing.T) { + nodes := doRequest(t, structs.ServiceKindTypical) + // redis (3), web (3), critical (1), warning (1) and consul (1) + require.Len(t, nodes, 9) + }) + + t.Run("Mesh Gateway", func(t *testing.T) { + nodes := doRequest(t, structs.ServiceKindMeshGateway) + require.Len(t, nodes, 1) + require.Equal(t, "mg-gw", nodes[0].Service.Service) + require.Equal(t, "mg-gw-01", nodes[0].Service.ID) + }) + + t.Run("Connect Proxy", func(t *testing.T) { + nodes := doRequest(t, structs.ServiceKindConnectProxy) + require.Len(t, nodes, 1) + require.Equal(t, "web-proxy", nodes[0].Service.Service) + require.Equal(t, "web-proxy", nodes[0].Service.ID) + }) +} diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 5f9c318272..5e3ad9c9e3 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -102,6 +102,12 @@ func servicesTableSchema() *memdb.TableSchema { Unique: false, Indexer: &IndexConnectService{}, }, + "kind": &memdb.IndexSchema{ + Name: "kind", + AllowMissing: false, + Unique: false, + Indexer: &IndexServiceKind{}, + }, }, } } @@ -848,6 +854,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } + if err := tx.Insert("index", &IndexEntry{serviceKindIndexName(svc.Kind), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } return nil } @@ -1330,6 +1339,16 @@ func serviceIndexName(name string) string { return fmt.Sprintf("service.%s", name) } +func serviceKindIndexName(kind structs.ServiceKind) string { + switch kind { + case structs.ServiceKindTypical: + // needs a special case here + return "service_kind.typical" + default: + return "service_kind." + string(kind) + } +} + // deleteServiceCASTxn is used to try doing a service delete operation with a given // raft index. If the CAS index specified is not equal to the last observed index for // the given service, then the call is a noop, otherwise a normal delete is invoked. @@ -1454,6 +1473,9 @@ func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } + if err := tx.Insert("index", &IndexEntry{serviceKindIndexName(svc.Kind), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } } return nil } @@ -1544,6 +1566,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } + if err = tx.Insert("index", &IndexEntry{serviceKindIndexName(svc.ServiceKind), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } } } else { if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) { @@ -2155,10 +2180,18 @@ func (s *Store) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) { return s.parseNodes(tx, ws, idx, nodes) } -func (s *Store) ServiceDump(ws memdb.WatchSet) (uint64, structs.CheckServiceNodes, error) { +func (s *Store) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool) (uint64, structs.CheckServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() + if useKind { + return s.serviceDumpKindTxn(tx, ws, kind) + } else { + return s.serviceDumpAllTxn(tx, ws) + } +} + +func (s *Store) serviceDumpAllTxn(tx *memdb.Txn, ws memdb.WatchSet) (uint64, structs.CheckServiceNodes, error) { // Get the table index idx := maxIndexWatchTxn(tx, ws, "nodes", "services", "checks") @@ -2176,6 +2209,27 @@ func (s *Store) ServiceDump(ws memdb.WatchSet) (uint64, structs.CheckServiceNode return s.parseCheckServiceNodes(tx, nil, idx, "", results, err) } +func (s *Store) serviceDumpKindTxn(tx *memdb.Txn, ws memdb.WatchSet, kind structs.ServiceKind) (uint64, structs.CheckServiceNodes, error) { + // unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks) + // updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual + // entries + idx := maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind)) + + // Query the state store for the service. + services, err := tx.Get("services", "kind", string(kind)) + if err != nil { + return 0, nil, fmt.Errorf("failed service lookup: %s", err) + } + + var results structs.ServiceNodes + for service := services.Next(); service != nil; service = services.Next() { + sn := service.(*structs.ServiceNode) + results = append(results, sn) + } + + return s.parseCheckServiceNodes(tx, nil, idx, "", results, err) +} + // parseNodes takes an iterator over a set of nodes and returns a struct // containing the nodes along with all of their associated services // and/or health checks. diff --git a/agent/consul/state/index_service_kind.go b/agent/consul/state/index_service_kind.go new file mode 100644 index 0000000000..4779599068 --- /dev/null +++ b/agent/consul/state/index_service_kind.go @@ -0,0 +1,36 @@ +package state + +import ( + "fmt" + "strings" + + "github.com/hashicorp/consul/agent/structs" +) + +// IndexServiceKind indexes a *struct.ServiceNode for querying by +// the services kind. We need a custom indexer because of the default +// kind being the empty string +type IndexServiceKind struct{} + +func (idx *IndexServiceKind) FromObject(obj interface{}) (bool, []byte, error) { + sn, ok := obj.(*structs.ServiceNode) + if !ok { + return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj) + } + + return true, append([]byte(strings.ToLower(string(sn.ServiceKind))), '\x00'), nil +} + +func (idx *IndexServiceKind) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + + arg, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("argument must be a structs.ServiceKind: %#v", args[0]) + } + + // Add the null character as a terminator + return append([]byte(strings.ToLower(arg)), '\x00'), nil +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 80ba8b1533..4c35043858 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -380,6 +380,55 @@ func (r *DCSpecificRequest) CacheMinIndex() uint64 { return r.QueryOptions.MinQueryIndex } +type ServiceDumpRequest struct { + Datacenter string + ServiceKind ServiceKind + UseServiceKind bool + Source QuerySource + QueryOptions +} + +func (r *ServiceDumpRequest) RequestDatacenter() string { + return r.Datacenter +} + +func (r *ServiceDumpRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + Token: r.Token, + Datacenter: r.Datacenter, + MinIndex: r.MinQueryIndex, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + } + + // When we are not using the service kind we want to normalize the ServiceKind + keyKind := ServiceKindTypical + if r.UseServiceKind { + keyKind = r.ServiceKind + } + // To calculate the cache key we only hash the node meta filters and the bexpr filter. + // The datacenter is handled by the cache framework. The other fields are + // not, but should not be used in any cache types. + v, err := hashstructure.Hash([]interface{}{ + keyKind, + r.UseServiceKind, + r.Filter, + }, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + +func (r *ServiceDumpRequest) CacheMinIndex() uint64 { + return r.QueryOptions.MinQueryIndex +} + // ServiceSpecificRequest is used to query about a specific service type ServiceSpecificRequest struct { Datacenter string @@ -710,6 +759,20 @@ const ( ServiceKindConnectProxy ServiceKind = "connect-proxy" ) +func ServiceKindFromString(kind string) (ServiceKind, error) { + switch kind { + case string(ServiceKindTypical): + return ServiceKindTypical, nil + case string(ServiceKindConnectProxy): + return ServiceKindConnectProxy, nil + case string(ServiceKindMeshGateway): + return ServiceKindMeshGateway, nil + default: + // have to return something and it may as well be typical + return ServiceKindTypical, fmt.Errorf("Invalid service kind: %s", kind) + } +} + // Type to hold a address and port of a service type ServiceAddress struct { Address string diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index 67a74faf2a..0e3035afdd 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -116,7 +116,7 @@ RPC: // ServiceSummary which provides overview information for the service func (s *HTTPServer) UIServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Parse arguments - args := structs.DCSpecificRequest{} + args := structs.ServiceDumpRequest{} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil }