Add support for filtering the 'List Services' API

1. Create a bexpr filter for performing the filtering
2. Change the state store functions to return the raw (not aggregated)
   list of ServiceNodes.
3. Move the aggregate service tags by name logic out of the state store
   functions into a new function called from the RPC endpoint
4. Perform the filtering in the endpoint before aggregation.
This commit is contained in:
Daniel Kimsey 2022-08-10 16:52:32 -05:00
parent 56b12ad3a3
commit 3c4fa9b468
7 changed files with 177 additions and 82 deletions

3
.changelog/11742.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
api: Add filtering support to Catalog's List Services (v1/catalog/services)
```

View File

@ -565,6 +565,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err
}
filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{})
if err != nil {
return err
}
// Set reply enterprise metadata after resolving and validating the token so
// that we can properly infer metadata from the token.
reply.EnterpriseMeta = args.EnterpriseMeta
@ -574,10 +579,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var err error
var serviceNodes structs.ServiceNodes
if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName)
reply.Index, serviceNodes, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName)
} else {
reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
reply.Index, serviceNodes, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
}
if err != nil {
return err
@ -588,11 +594,43 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return nil
}
raw, err := filter.Execute(serviceNodes)
if err != nil {
return err
}
reply.Services = servicesTagsByName(raw.(structs.ServiceNodes))
c.srv.filterACLWithAuthorizer(authz, reply)
return nil
})
}
func servicesTagsByName(services []*structs.ServiceNode) structs.Services {
unique := make(map[string]map[string]struct{})
for _, svc := range services {
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return results
}
// ServiceList is used to query the services in a DC.
// Returns services as a list of ServiceNames.
func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error {

View File

@ -1523,6 +1523,45 @@ func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {
}
}
func TestCatalog_ListServices_Filter(t *testing.T) {
t.Parallel()
_, s1 := testServer(t)
codec := rpcClient(t, s1)
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// prep the cluster with some data we can use in our filters
registerTestCatalogEntries(t, codec)
// Run the tests against the test server
t.Run("ListServices", func(t *testing.T) {
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
args.Filter = "ServiceName == redis"
out := new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Contains(t, out.Services, "redis")
require.ElementsMatch(t, []string{"v1", "v2"}, out.Services["redis"])
args.Filter = "NodeMeta.os == NoSuchOS"
out = new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Len(t, out.Services, 0)
args.Filter = "NodeMeta.NoSuchMetadata == linux"
out = new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Len(t, out.Services, 0)
args.Filter = "InvalidField == linux"
out = new(structs.IndexedServices)
require.Error(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
})
}
func TestCatalog_ListServices_Blocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -1134,7 +1134,7 @@ func terminatingGatewayVirtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool,
}
// Services returns all services along with a list of associated tags.
func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) {
func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1148,30 +1148,11 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerNam
}
ws.Add(services.WatchCh())
// Rip through the services and enumerate them and their unique set of
// tags.
unique := make(map[string]map[string]struct{})
var result []*structs.ServiceNode
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
result = append(result, service.(*structs.ServiceNode))
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return idx, results, nil
return idx, result, nil
}
func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) {
@ -1212,7 +1193,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *acl.EnterpriseMeta,
}
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) {
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1259,8 +1240,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
}
allServicesCh := allServices.WatchCh()
// Populate the services map
unique := make(map[string]map[string]struct{})
var result structs.ServiceNodes
for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) {
@ -1274,30 +1254,11 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
}
ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh)
// Rip through the services and enumerate them and their unique set of
// tags.
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
result = append(result, service.(*structs.ServiceNode))
}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return idx, results, nil
return idx, result, nil
}
// maxIndexForService return the maximum Raft Index for a service

View File

@ -12,6 +12,8 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/assert"
@ -2105,7 +2107,7 @@ func TestStateStore_Services(t *testing.T) {
if err := s.EnsureService(2, "node1", ns1); err != nil {
t.Fatalf("err: %s", err)
}
testRegisterService(t, s, 3, "node1", "dogs")
ns1Dogs := testRegisterService(t, s, 3, "node1", "dogs")
testRegisterNode(t, s, 4, "node2")
ns2 := &structs.NodeService{
ID: "service3",
@ -2131,19 +2133,13 @@ func TestStateStore_Services(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Verify the result. We sort the lists since the order is
// non-deterministic (it's built using a map internally).
expected := structs.Services{
"redis": []string{"prod", "primary", "replica"},
"dogs": []string{},
}
sort.Strings(expected["redis"])
for _, tags := range services {
sort.Strings(tags)
}
if !reflect.DeepEqual(expected, services) {
t.Fatalf("bad: %#v", services)
// Verify the result.
expected := []*structs.ServiceNode{
ns1Dogs.ToServiceNode("node1"),
ns1.ToServiceNode("node1"),
ns2.ToServiceNode("node2"),
}
assertDeepEqual(t, services, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
// Deleting a node with a service should fire the watch.
if err := s.DeleteNode(6, "node1", nil, ""); err != nil {
@ -2206,11 +2202,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{
"redis": []string{"primary", "prod"},
expected := []*structs.ServiceNode{
ns1.ToServiceNode("node0"),
}
sort.Strings(res["redis"])
require.Equal(t, expected, res)
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Get all services using the common meta value", func(t *testing.T) {
@ -2218,11 +2213,12 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{
"redis": []string{"primary", "prod", "replica"},
require.Len(t, res, 2)
expected := []*structs.ServiceNode{
ns1.ToServiceNode("node0"),
ns2.ToServiceNode("node1"),
}
sort.Strings(res["redis"])
require.Equal(t, expected, res)
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Get an empty list for an invalid meta value", func(t *testing.T) {
@ -2230,8 +2226,8 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{}
require.Equal(t, expected, res)
var expected []*structs.ServiceNode
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Get the first node's service instance using multiple meta filters", func(t *testing.T) {
@ -2239,11 +2235,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{
"redis": []string{"primary", "prod"},
expected := []*structs.ServiceNode{
ns1.ToServiceNode("node0"),
}
sort.Strings(res["redis"])
require.Equal(t, expected, res)
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Registering some unrelated node + service should not fire the watch.", func(t *testing.T) {
@ -8807,3 +8802,10 @@ func setVirtualIPFlags(t *testing.T, s *Store) {
Value: "true",
}))
}
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}

View File

@ -146,13 +146,13 @@ func testRegisterServiceOpts(t *testing.T, s *Store, idx uint64, nodeID, service
// testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated
// even if service already exists if using `modifyAccordingIndex`.
// This is done by setting the transaction ID in "version" meta so service will be updated if it already exists
func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex)
func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) *structs.NodeService {
return testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex)
}
// testRegisterServiceWithChangeOpts is the same as testRegisterServiceWithChange with the addition of opts that can
// modify the service prior to writing.
func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) {
func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) *structs.NodeService {
meta := make(map[string]string)
if modifyAccordingIndex {
meta["version"] = fmt.Sprint(idx)
@ -183,14 +183,15 @@ func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeI
result.ServiceID != serviceID {
t.Fatalf("bad service: %#v", result)
}
return svc
}
// testRegisterService register a service with given transaction idx
// If the service already exists, transaction number might not be increased
// Use `testRegisterServiceWithChange()` if you want perform a registration that
// ensures the transaction is updated by setting idx in Meta of Service
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) *structs.NodeService {
return testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
}
func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {

View File

@ -410,13 +410,64 @@ The corresponding CLI command is [`consul catalog services`](/commands/catalog/s
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried.
- `node-meta` `(string: "")` - Specifies a desired node metadata key/value pair
- `node-meta` `(string: "")` **Deprecated** - Use `filter` with the `NodeMeta` selector instead.
This parameter will be removed in a future version of Consul.
Specifies a desired node metadata key/value pair
of the form `key:value`. This parameter can be specified multiple times, and
filters the results to nodes with the specified key/value pairs.
- `ns` `(string: "")` <EnterpriseAlert inline /> - Specifies the namespace of the services you lookup.
You can also [specify the namespace through other methods](#methods-to-specify-namespace).
- `filter` `(string: "")` - Specifies the expression used to filter the
queries results prior to returning the data.
### Filtering
The filter will be executed against each Service mapping within the catalog.
The following selectors and filter operations are supported:
| Selector | Supported Operations |
| ---------------------------------------------------- | -------------------------------------------------- |
| `Address` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `Node` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `NodeMeta.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `NodeMeta` | Is Empty, Is Not Empty, In, Not In |
| `ServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceConnect.Native` | Equal, Not Equal |
| `ServiceEnableTagOverride` | Equal, Not Equal |
| `ServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceKind` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceMeta.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceMeta` | Is Empty, Is Not Empty, In, Not In |
| `ServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServicePort` | Equal, Not Equal |
| `ServiceProxy.DestinationServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.DestinationServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.LocalServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.LocalServicePort` | Equal, Not Equal |
| `ServiceProxy.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.TransparentProxy.OutboundListenerPort` | Equal, Not Equal |
| `ServiceProxy.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationNamespace` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationType` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.LocalBindAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.LocalBindPort` | Equal, Not Equal |
| `ServiceProxy.Upstreams.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams` | Is Empty, Is Not Empty |
| `ServiceTaggedAddresses.<any>.Address` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceTaggedAddresses.<any>.Port` | Equal, Not Equal |
| `ServiceTaggedAddresses` | Is Empty, Is Not Empty, In, Not In |
| `ServiceTags` | In, Not In, Is Empty, Is Not Empty |
| `ServiceWeights.Passing` | Equal, Not Equal |
| `ServiceWeights.Warning` | Equal, Not Equal |
| `TaggedAddresses.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `TaggedAddresses` | Is Empty, Is Not Empty, In, Not In |
### Sample Request
```shell-session