diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 43dc705ea7..b4f7ce2790 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -257,19 +257,21 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - var index uint64 - var nodes structs.Nodes var err error if len(args.NodeMetaFilters) > 0 { - index, nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters) + reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters) } else { - index, nodes, err = state.Nodes(ws) + reply.Index, reply.Nodes, err = state.Nodes(ws) } if err != nil { return err } + if isUnmodified(args.QueryOptions, reply.Index) { + reply.QueryMeta.NotModified = true + reply.Nodes = nil + return nil + } - reply.Index, reply.Nodes = index, nodes if err := c.srv.filterACL(args.Token, reply); err != nil { return err } @@ -284,13 +286,17 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde }) } +func isUnmodified(opts structs.QueryOptions, index uint64) bool { + return opts.AllowNotModifiedResponse && opts.MinQueryIndex > 0 && opts.MinQueryIndex == index +} + // ListServices is used to query the services in a DC func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done { return err } - (*reply).EnterpriseMeta = args.EnterpriseMeta + reply.EnterpriseMeta = args.EnterpriseMeta authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) if err != nil { @@ -305,19 +311,21 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - var index uint64 - var services structs.Services var err error if len(args.NodeMetaFilters) > 0 { - index, services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta) + reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta) } else { - index, services, err = state.Services(ws, &args.EnterpriseMeta) + reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta) } if err != nil { return err } + if isUnmodified(args.QueryOptions, reply.Index) { + reply.Services = nil + reply.QueryMeta.NotModified = true + return nil + } - reply.Index, reply.Services, reply.EnterpriseMeta = index, services, args.EnterpriseMeta return c.srv.filterACLWithAuthorizer(authz, reply) }) } diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index f38e84dfc9..067574d312 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -783,6 +783,21 @@ func TestCatalog_ListNodes(t *testing.T) { if out.Nodes[0].Address != "127.0.0.1" { t.Fatalf("bad: %v", out) } + require.False(t, out.QueryMeta.NotModified) + + t.Run("with option AllowNotModifiedResponse", func(t *testing.T) { + args.QueryOptions = structs.QueryOptions{ + MinQueryIndex: out.QueryMeta.Index, + MaxQueryTime: 20 * time.Millisecond, + AllowNotModifiedResponse: true, + } + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) + require.NoError(t, err) + + require.Equal(t, out.Index, out.QueryMeta.Index) + require.Len(t, out.Nodes, 0) + require.True(t, out.QueryMeta.NotModified, "NotModified should be true") + }) } func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) { @@ -1322,6 +1337,21 @@ func TestCatalog_ListServices(t *testing.T) { if out.Services["db"][0] != "primary" { t.Fatalf("bad: %v", out) } + require.False(t, out.QueryMeta.NotModified) + + t.Run("with option AllowNotModifiedResponse", func(t *testing.T) { + args.QueryOptions = structs.QueryOptions{ + MinQueryIndex: out.QueryMeta.Index, + MaxQueryTime: 20 * time.Millisecond, + AllowNotModifiedResponse: true, + } + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) + require.NoError(t, err) + + require.Equal(t, out.Index, out.QueryMeta.Index) + require.Len(t, out.Services, 0) + require.True(t, out.QueryMeta.NotModified, "NotModified should be true") + }) } func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e1d638a261..dc4f67df8f 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -212,6 +212,11 @@ type QueryOptions struct { // Filter specifies the go-bexpr filter expression to be used for // filtering the data prior to returning a response Filter string + + // AllowNotModifiedResponse indicates that if the MinIndex matches the + // QueryMeta.Index, the response can be left empty and QueryMeta.NotModified + // will be set to true to indicate the result of the query has not changed. + AllowNotModifiedResponse bool } // IsRead is always true for QueryOption. @@ -268,7 +273,7 @@ func (w *WriteRequest) SetTokenSecret(s string) { // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { - // This is the index associated with the read + // Index in the raft log of the latest item returned by the query. Index uint64 // If AllowStale is used, this is time elapsed since @@ -283,6 +288,12 @@ type QueryMeta struct { // Having `discovery_max_stale` on the agent can affect whether // the request was served by a leader. ConsistencyLevel string + + // NotModified is true when the Index of the query is the same value as the + // requested MinIndex. It indicates that the entity has not been modified. + // When NotModified is true, the response will not contain the result of + // the query. + NotModified bool } // RegisterRequest is used for the Catalog.Register endpoint