Implement Kind based ServiceDump and caching of the ServiceDump RPC

This commit is contained in:
Matt Keeler 2019-06-20 15:04:39 -04:00
parent ef8e5747eb
commit 3943e38133
10 changed files with 371 additions and 4 deletions

View File

@ -3937,6 +3937,14 @@ func (a *Agent) registerCache() {
}, &cache.RegisterOptions{ }, &cache.RegisterOptions{
Refresh: false, Refresh: false,
}) })
a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{
RPC: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
} }
// defaultProxyCommand returns the default Connect managed proxy command. // defaultProxyCommand returns the default Connect managed proxy command.

View File

@ -0,0 +1,51 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const InternalServiceDumpName = "service-dump"
// InternalServiceDump supports fetching discovering service names via the catalog.
type InternalServiceDump struct {
RPC RPC
}
func (c *InternalServiceDump) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a ServiceDumpRequest.
reqReal, ok := req.(*structs.ServiceDumpRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.IndexedCheckServiceNodes
if err := c.RPC.RPC("Internal.ServiceDump", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *InternalServiceDump) SupportsBlocking() bool {
return true
}

View File

@ -0,0 +1,63 @@
package cachetype
import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestInternalServiceDump(t *testing.T) {
rpc := TestRPC(t)
typ := &InternalServiceDump{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedCheckServiceNodes
rpc.On("RPC", "Internal.ServiceDump", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceDumpRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedCheckServiceNodes)
reply.Nodes = []structs.CheckServiceNode{
{Service: &structs.NodeService{Kind: req.ServiceKind, Service: "foo"}},
}
reply.QueryMeta.Index = 48
resp = reply
})
// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceDumpRequest{
Datacenter: "dc1",
ServiceKind: structs.ServiceKindMeshGateway,
UseServiceKind: true,
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)
rpc.AssertExpectations(t)
}
func TestInternalServiceDump_badReqType(t *testing.T) {
rpc := TestRPC(t)
typ := &CatalogServices{RPC: rpc}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(t, err)
require.Contains(t, err.Error(), "wrong type")
rpc.AssertExpectations(t)
}

View File

@ -493,6 +493,49 @@ func registerTestCatalogEntries(t *testing.T, codec rpc.ClientCodec) {
}, },
} }
registerTestCatalogEntriesMap(t, codec, registrations)
}
func registerTestCatalogEntries2(t *testing.T, codec rpc.ClientCodec) {
t.Helper()
registrations := map[string]*structs.RegisterRequest{
"Service mg-gw": &structs.RegisterRequest{
Datacenter: "dc1",
Node: "gateway",
ID: types.NodeID("72e18a4c-85ec-4520-978f-2fc0378b06aa"),
Address: "10.1.2.3",
Service: &structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mg-gw-01",
Service: "mg-gw",
Port: 8443,
Address: "198.18.1.4",
},
},
"Service rproxy": &structs.RegisterRequest{
Datacenter: "dc1",
Node: "proxy",
ID: types.NodeID("2d31602c-3291-4f94-842d-446bc2f945ce"),
Address: "10.1.2.4",
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8443,
Address: "198.18.1.5",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
},
},
},
}
registerTestCatalogEntriesMap(t, codec, registrations)
}
func registerTestCatalogEntriesMap(t *testing.T, codec rpc.ClientCodec, registrations map[string]*structs.RegisterRequest) {
t.Helper()
for name, reg := range registrations { for name, reg := range registrations {
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil) err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil)
require.NoError(t, err, "Failed catalog registration %q: %v", name, err) require.NoError(t, err, "Failed catalog registration %q: %v", name, err)

View File

@ -76,7 +76,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
}) })
} }
func (m *Internal) ServiceDump(args *structs.DCSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedCheckServiceNodes) error {
if done, err := m.srv.forward("Internal.ServiceDump", args, args, reply); done { if done, err := m.srv.forward("Internal.ServiceDump", args, args, reply); done {
return err return err
} }
@ -90,7 +90,7 @@ func (m *Internal) ServiceDump(args *structs.DCSpecificRequest, reply *structs.I
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
index, nodes, err := state.ServiceDump(ws) index, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind)
if err != nil { if err != nil {
return err return err
} }

View File

@ -483,3 +483,52 @@ func TestInternal_ServiceDump(t *testing.T) {
require.Len(t, nodes, 3) require.Len(t, nodes, 3)
}) })
} }
func TestInternal_ServiceDump_Kind(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// prep the cluster with some data we can use in our filters
registerTestCatalogEntries(t, codec)
registerTestCatalogEntries2(t, codec)
doRequest := func(t *testing.T, kind structs.ServiceKind) structs.CheckServiceNodes {
t.Helper()
args := structs.ServiceDumpRequest{
Datacenter: "dc1",
ServiceKind: kind,
UseServiceKind: true,
}
var out structs.IndexedCheckServiceNodes
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out))
return out.Nodes
}
// Run the tests against the test server
t.Run("Typical", func(t *testing.T) {
nodes := doRequest(t, structs.ServiceKindTypical)
// redis (3), web (3), critical (1), warning (1) and consul (1)
require.Len(t, nodes, 9)
})
t.Run("Mesh Gateway", func(t *testing.T) {
nodes := doRequest(t, structs.ServiceKindMeshGateway)
require.Len(t, nodes, 1)
require.Equal(t, "mg-gw", nodes[0].Service.Service)
require.Equal(t, "mg-gw-01", nodes[0].Service.ID)
})
t.Run("Connect Proxy", func(t *testing.T) {
nodes := doRequest(t, structs.ServiceKindConnectProxy)
require.Len(t, nodes, 1)
require.Equal(t, "web-proxy", nodes[0].Service.Service)
require.Equal(t, "web-proxy", nodes[0].Service.ID)
})
}

View File

@ -102,6 +102,12 @@ func servicesTableSchema() *memdb.TableSchema {
Unique: false, Unique: false,
Indexer: &IndexConnectService{}, Indexer: &IndexConnectService{},
}, },
"kind": &memdb.IndexSchema{
Name: "kind",
AllowMissing: false,
Unique: false,
Indexer: &IndexServiceKind{},
},
}, },
} }
} }
@ -848,6 +854,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{serviceKindIndexName(svc.Kind), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil return nil
} }
@ -1330,6 +1339,16 @@ func serviceIndexName(name string) string {
return fmt.Sprintf("service.%s", name) return fmt.Sprintf("service.%s", name)
} }
func serviceKindIndexName(kind structs.ServiceKind) string {
switch kind {
case structs.ServiceKindTypical:
// needs a special case here
return "service_kind.typical"
default:
return "service_kind." + string(kind)
}
}
// deleteServiceCASTxn is used to try doing a service delete operation with a given // deleteServiceCASTxn is used to try doing a service delete operation with a given
// raft index. If the CAS index specified is not equal to the last observed index for // raft index. If the CAS index specified is not equal to the last observed index for
// the given service, then the call is a noop, otherwise a normal delete is invoked. // the given service, then the call is a noop, otherwise a normal delete is invoked.
@ -1454,6 +1473,9 @@ func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{serviceKindIndexName(svc.Kind), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} }
return nil return nil
} }
@ -1544,6 +1566,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
if err = tx.Insert("index", &IndexEntry{serviceKindIndexName(svc.ServiceKind), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} }
} else { } else {
if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) { if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) {
@ -2155,10 +2180,18 @@ func (s *Store) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) {
return s.parseNodes(tx, ws, idx, nodes) return s.parseNodes(tx, ws, idx, nodes)
} }
func (s *Store) ServiceDump(ws memdb.WatchSet) (uint64, structs.CheckServiceNodes, error) { func (s *Store) ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool) (uint64, structs.CheckServiceNodes, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
if useKind {
return s.serviceDumpKindTxn(tx, ws, kind)
} else {
return s.serviceDumpAllTxn(tx, ws)
}
}
func (s *Store) serviceDumpAllTxn(tx *memdb.Txn, ws memdb.WatchSet) (uint64, structs.CheckServiceNodes, error) {
// Get the table index // Get the table index
idx := maxIndexWatchTxn(tx, ws, "nodes", "services", "checks") idx := maxIndexWatchTxn(tx, ws, "nodes", "services", "checks")
@ -2176,6 +2209,27 @@ func (s *Store) ServiceDump(ws memdb.WatchSet) (uint64, structs.CheckServiceNode
return s.parseCheckServiceNodes(tx, nil, idx, "", results, err) return s.parseCheckServiceNodes(tx, nil, idx, "", results, err)
} }
func (s *Store) serviceDumpKindTxn(tx *memdb.Txn, ws memdb.WatchSet, kind structs.ServiceKind) (uint64, structs.CheckServiceNodes, error) {
// unlike when we are dumping all services here we only need to watch the kind specific index entry for changing (or nodes, checks)
// updating any services, nodes or checks will bump the appropriate service kind index so there is no need to watch any of the individual
// entries
idx := maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind))
// Query the state store for the service.
services, err := tx.Get("services", "kind", string(kind))
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
var results structs.ServiceNodes
for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode)
results = append(results, sn)
}
return s.parseCheckServiceNodes(tx, nil, idx, "", results, err)
}
// parseNodes takes an iterator over a set of nodes and returns a struct // parseNodes takes an iterator over a set of nodes and returns a struct
// containing the nodes along with all of their associated services // containing the nodes along with all of their associated services
// and/or health checks. // and/or health checks.

View File

@ -0,0 +1,36 @@
package state
import (
"fmt"
"strings"
"github.com/hashicorp/consul/agent/structs"
)
// IndexServiceKind indexes a *struct.ServiceNode for querying by
// the services kind. We need a custom indexer because of the default
// kind being the empty string
type IndexServiceKind struct{}
func (idx *IndexServiceKind) FromObject(obj interface{}) (bool, []byte, error) {
sn, ok := obj.(*structs.ServiceNode)
if !ok {
return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj)
}
return true, append([]byte(strings.ToLower(string(sn.ServiceKind))), '\x00'), nil
}
func (idx *IndexServiceKind) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a structs.ServiceKind: %#v", args[0])
}
// Add the null character as a terminator
return append([]byte(strings.ToLower(arg)), '\x00'), nil
}

View File

@ -380,6 +380,55 @@ func (r *DCSpecificRequest) CacheMinIndex() uint64 {
return r.QueryOptions.MinQueryIndex return r.QueryOptions.MinQueryIndex
} }
type ServiceDumpRequest struct {
Datacenter string
ServiceKind ServiceKind
UseServiceKind bool
Source QuerySource
QueryOptions
}
func (r *ServiceDumpRequest) RequestDatacenter() string {
return r.Datacenter
}
func (r *ServiceDumpRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
Token: r.Token,
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MaxAge: r.MaxAge,
MustRevalidate: r.MustRevalidate,
}
// When we are not using the service kind we want to normalize the ServiceKind
keyKind := ServiceKindTypical
if r.UseServiceKind {
keyKind = r.ServiceKind
}
// To calculate the cache key we only hash the node meta filters and the bexpr filter.
// The datacenter is handled by the cache framework. The other fields are
// not, but should not be used in any cache types.
v, err := hashstructure.Hash([]interface{}{
keyKind,
r.UseServiceKind,
r.Filter,
}, nil)
if err == nil {
// If there is an error, we don't set the key. A blank key forces
// no cache for this request so the request is forwarded directly
// to the server.
info.Key = strconv.FormatUint(v, 10)
}
return info
}
func (r *ServiceDumpRequest) CacheMinIndex() uint64 {
return r.QueryOptions.MinQueryIndex
}
// ServiceSpecificRequest is used to query about a specific service // ServiceSpecificRequest is used to query about a specific service
type ServiceSpecificRequest struct { type ServiceSpecificRequest struct {
Datacenter string Datacenter string
@ -710,6 +759,20 @@ const (
ServiceKindConnectProxy ServiceKind = "connect-proxy" ServiceKindConnectProxy ServiceKind = "connect-proxy"
) )
func ServiceKindFromString(kind string) (ServiceKind, error) {
switch kind {
case string(ServiceKindTypical):
return ServiceKindTypical, nil
case string(ServiceKindConnectProxy):
return ServiceKindConnectProxy, nil
case string(ServiceKindMeshGateway):
return ServiceKindMeshGateway, nil
default:
// have to return something and it may as well be typical
return ServiceKindTypical, fmt.Errorf("Invalid service kind: %s", kind)
}
}
// Type to hold a address and port of a service // Type to hold a address and port of a service
type ServiceAddress struct { type ServiceAddress struct {
Address string Address string

View File

@ -116,7 +116,7 @@ RPC:
// ServiceSummary which provides overview information for the service // ServiceSummary which provides overview information for the service
func (s *HTTPServer) UIServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) UIServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Parse arguments // Parse arguments
args := structs.DCSpecificRequest{} args := structs.ServiceDumpRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil return nil, nil
} }