mirror of https://github.com/status-im/consul.git
Improve Connect with Prepared Queries (#5291)
Given a query like: ``` { "Name": "tagged-connect-query", "Service": { "Service": "foo", "Tags": ["tag"], "Connect": true } } ``` And a Consul configuration like: ``` { "services": [ "name": "foo", "port": 8080, "connect": { "sidecar_service": {} }, "tags": ["tag"] ] } ``` If you executed the query it would always turn up with 0 results. This was because the sidecar service was being created without any tags. You could instead make your config look like: ``` { "services": [ "name": "foo", "port": 8080, "connect": { "sidecar_service": { "tags": ["tag"] } }, "tags": ["tag"] ] } ``` However that is a bit redundant for most cases. This PR ensures that the tags and service meta of the parent service get copied to the sidecar service. If there are any tags or service meta set in the sidecar service definition then this copying does not take place. After the changes, the query will now return the expected results. A second change was made to prepared queries in this PR which is to allow filtering on ServiceMeta just like we allow for filtering on NodeMeta.
This commit is contained in:
parent
6e83f68637
commit
acfd87c673
|
@ -2169,6 +2169,84 @@ func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t.Name(), `
|
||||||
|
service = {
|
||||||
|
id = "rabbitmq"
|
||||||
|
name = "rabbitmq"
|
||||||
|
port = 5672
|
||||||
|
tags = ["a", "b"],
|
||||||
|
meta = {
|
||||||
|
environment = "prod"
|
||||||
|
}
|
||||||
|
connect = {
|
||||||
|
sidecar_service {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`)
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
services := a.State.Services()
|
||||||
|
|
||||||
|
svc, ok := services["rabbitmq"]
|
||||||
|
require.True(t, ok, "missing service")
|
||||||
|
require.Len(t, svc.Tags, 2)
|
||||||
|
require.Len(t, svc.Meta, 1)
|
||||||
|
|
||||||
|
sidecar, ok := services["rabbitmq-sidecar-proxy"]
|
||||||
|
require.True(t, ok, "missing sidecar service")
|
||||||
|
require.ElementsMatch(t, svc.Tags, sidecar.Tags)
|
||||||
|
require.Len(t, sidecar.Meta, 1)
|
||||||
|
meta, ok := sidecar.Meta["environment"]
|
||||||
|
require.True(t, ok, "missing sidecar service meta")
|
||||||
|
require.Equal(t, "prod", meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t.Name(), `
|
||||||
|
service = {
|
||||||
|
id = "rabbitmq"
|
||||||
|
name = "rabbitmq"
|
||||||
|
port = 5672
|
||||||
|
tags = ["a", "b"],
|
||||||
|
meta = {
|
||||||
|
environment = "prod"
|
||||||
|
}
|
||||||
|
connect = {
|
||||||
|
sidecar_service {
|
||||||
|
tags = ["foo"],
|
||||||
|
meta = {
|
||||||
|
environment = "qa"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`)
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
services := a.State.Services()
|
||||||
|
|
||||||
|
svc, ok := services["rabbitmq"]
|
||||||
|
require.True(t, ok, "missing service")
|
||||||
|
require.Len(t, svc.Tags, 2)
|
||||||
|
require.Len(t, svc.Meta, 1)
|
||||||
|
|
||||||
|
sidecar, ok := services["rabbitmq-sidecar-proxy"]
|
||||||
|
require.True(t, ok, "missing sidecar service")
|
||||||
|
require.Len(t, sidecar.Tags, 1)
|
||||||
|
require.Equal(t, "foo", sidecar.Tags[0])
|
||||||
|
require.Len(t, sidecar.Meta, 1)
|
||||||
|
meta, ok := sidecar.Meta["environment"]
|
||||||
|
require.True(t, ok, "missing sidecar service meta")
|
||||||
|
require.Equal(t, "qa", meta)
|
||||||
|
}
|
||||||
|
|
||||||
func TestAgent_unloadServices(t *testing.T) {
|
func TestAgent_unloadServices(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a := NewTestAgent(t.Name(), "")
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
|
|
@ -534,6 +534,11 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
||||||
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
|
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply the service metadata filters, if any.
|
||||||
|
if len(query.Service.ServiceMeta) > 0 {
|
||||||
|
nodes = serviceMetaFilter(query.Service.ServiceMeta, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
// Apply the tag filters, if any.
|
// Apply the tag filters, if any.
|
||||||
if len(query.Service.Tags) > 0 {
|
if len(query.Service.Tags) > 0 {
|
||||||
nodes = tagFilter(query.Service.Tags, nodes)
|
nodes = tagFilter(query.Service.Tags, nodes)
|
||||||
|
@ -616,6 +621,16 @@ func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes)
|
||||||
return filtered
|
return filtered
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
||||||
|
var filtered structs.CheckServiceNodes
|
||||||
|
for _, node := range nodes {
|
||||||
|
if structs.SatisfiesMetaFilters(node.Service.Meta, filters) {
|
||||||
|
filtered = append(filtered, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filtered
|
||||||
|
}
|
||||||
|
|
||||||
// queryServer is a wrapper that makes it easier to test the failover logic.
|
// queryServer is a wrapper that makes it easier to test the failover logic.
|
||||||
type queryServer interface {
|
type queryServer interface {
|
||||||
GetLogger() *log.Logger
|
GetLogger() *log.Logger
|
||||||
|
|
|
@ -1512,11 +1512,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
Service: "foo",
|
Service: "foo",
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
Tags: []string{dc, fmt.Sprintf("tag%d", i+1)},
|
Tags: []string{dc, fmt.Sprintf("tag%d", i+1)},
|
||||||
|
Meta: map[string]string{
|
||||||
|
"svc-group": fmt.Sprintf("%d", i%2),
|
||||||
|
"foo": "true",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
}
|
}
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
req.NodeMeta["unique"] = "true"
|
req.NodeMeta["unique"] = "true"
|
||||||
|
req.Service.Meta["unique"] = "true"
|
||||||
}
|
}
|
||||||
|
|
||||||
var codec rpc.ClientCodec
|
var codec rpc.ClientCodec
|
||||||
|
@ -1617,7 +1622,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run various service queries with node metadata filters.
|
// Run various service queries with node metadata filters.
|
||||||
if false {
|
{
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
filters map[string]string
|
filters map[string]string
|
||||||
numNodes int
|
numNodes int
|
||||||
|
@ -1682,6 +1687,67 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run various service queries with service metadata filters
|
||||||
|
{
|
||||||
|
cases := []struct {
|
||||||
|
filters map[string]string
|
||||||
|
numNodes int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
filters: map[string]string{},
|
||||||
|
numNodes: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: map[string]string{"foo": "true"},
|
||||||
|
numNodes: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: map[string]string{"svc-group": "0"},
|
||||||
|
numNodes: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: map[string]string{"svc-group": "1"},
|
||||||
|
numNodes: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: map[string]string{"svc-group": "0", "unique": "true"},
|
||||||
|
numNodes: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
svcMetaQuery := structs.PreparedQueryRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.PreparedQueryCreate,
|
||||||
|
Query: &structs.PreparedQuery{
|
||||||
|
Service: structs.ServiceQuery{
|
||||||
|
Service: "foo",
|
||||||
|
ServiceMeta: tc.filters,
|
||||||
|
},
|
||||||
|
DNS: structs.QueryDNSOptions{
|
||||||
|
TTL: "10s",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &svcMetaQuery, &svcMetaQuery.Query.ID))
|
||||||
|
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: svcMetaQuery.Query.ID,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
|
||||||
|
require.Len(t, reply.Nodes, tc.numNodes)
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
require.True(t, structs.SatisfiesMetaFilters(node.Service.Meta, tc.filters))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Push a coordinate for one of the nodes so we can try an RTT sort. We
|
// Push a coordinate for one of the nodes so we can try an RTT sort. We
|
||||||
// have to sleep a little while for the coordinate batch to get flushed.
|
// have to sleep a little while for the coordinate batch to get flushed.
|
||||||
{
|
{
|
||||||
|
|
|
@ -96,6 +96,7 @@ func TestPreparedQuery_Create(t *testing.T) {
|
||||||
OnlyPassing: true,
|
OnlyPassing: true,
|
||||||
Tags: []string{"foo", "bar"},
|
Tags: []string{"foo", "bar"},
|
||||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||||
|
ServiceMeta: map[string]string{"env": "prod"},
|
||||||
},
|
},
|
||||||
DNS: structs.QueryDNSOptions{
|
DNS: structs.QueryDNSOptions{
|
||||||
TTL: "10s",
|
TTL: "10s",
|
||||||
|
@ -132,6 +133,7 @@ func TestPreparedQuery_Create(t *testing.T) {
|
||||||
"OnlyPassing": true,
|
"OnlyPassing": true,
|
||||||
"Tags": []string{"foo", "bar"},
|
"Tags": []string{"foo", "bar"},
|
||||||
"NodeMeta": map[string]string{"somekey": "somevalue"},
|
"NodeMeta": map[string]string{"somekey": "somevalue"},
|
||||||
|
"ServiceMeta": map[string]string{"env": "prod"},
|
||||||
},
|
},
|
||||||
"DNS": map[string]interface{}{
|
"DNS": map[string]interface{}{
|
||||||
"TTL": "10s",
|
"TTL": "10s",
|
||||||
|
|
|
@ -49,6 +49,21 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Copy the service metadata from the original service if no other meta was provided
|
||||||
|
if len(sidecar.Meta) == 0 && len(ns.Meta) > 0 {
|
||||||
|
if sidecar.Meta == nil {
|
||||||
|
sidecar.Meta = make(map[string]string)
|
||||||
|
}
|
||||||
|
for k, v := range ns.Meta {
|
||||||
|
sidecar.Meta[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the tags from the original service if no other tags were specified
|
||||||
|
if len(sidecar.Tags) == 0 && len(ns.Tags) > 0 {
|
||||||
|
sidecar.Tags = append(sidecar.Tags, ns.Tags...)
|
||||||
|
}
|
||||||
|
|
||||||
// Flag this as a sidecar - this is not persisted in catalog but only needed
|
// Flag this as a sidecar - this is not persisted in catalog but only needed
|
||||||
// in local agent state to disambiguate lineage when deregistering the parent
|
// in local agent state to disambiguate lineage when deregistering the parent
|
||||||
// service later.
|
// service later.
|
||||||
|
|
|
@ -77,6 +77,8 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
|
||||||
ID: "web1",
|
ID: "web1",
|
||||||
Name: "web",
|
Name: "web",
|
||||||
Port: 1111,
|
Port: 1111,
|
||||||
|
Tags: []string{"baz"},
|
||||||
|
Meta: map[string]string{"foo": "baz"},
|
||||||
Connect: &structs.ServiceConnect{
|
Connect: &structs.ServiceConnect{
|
||||||
SidecarService: &structs.ServiceDefinition{
|
SidecarService: &structs.ServiceDefinition{
|
||||||
Name: "motorbike1",
|
Name: "motorbike1",
|
||||||
|
@ -167,6 +169,45 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
|
||||||
token: "foo",
|
token: "foo",
|
||||||
wantErr: "auto-assignment disabled in config",
|
wantErr: "auto-assignment disabled in config",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "inherit tags and meta",
|
||||||
|
sd: &structs.ServiceDefinition{
|
||||||
|
ID: "web1",
|
||||||
|
Name: "web",
|
||||||
|
Port: 1111,
|
||||||
|
Tags: []string{"foo"},
|
||||||
|
Meta: map[string]string{"foo": "bar"},
|
||||||
|
Connect: &structs.ServiceConnect{
|
||||||
|
SidecarService: &structs.ServiceDefinition{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantNS: &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "web1-sidecar-proxy",
|
||||||
|
Service: "web-sidecar-proxy",
|
||||||
|
Port: 2222,
|
||||||
|
Tags: []string{"foo"},
|
||||||
|
Meta: map[string]string{"foo": "bar"},
|
||||||
|
LocallyRegisteredAsSidecar: true,
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "web",
|
||||||
|
DestinationServiceID: "web1",
|
||||||
|
LocalServiceAddress: "127.0.0.1",
|
||||||
|
LocalServicePort: 1111,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantChecks: []*structs.CheckType{
|
||||||
|
&structs.CheckType{
|
||||||
|
Name: "Connect Sidecar Listening",
|
||||||
|
TCP: "127.0.0.1:2222",
|
||||||
|
Interval: 10 * time.Second,
|
||||||
|
},
|
||||||
|
&structs.CheckType{
|
||||||
|
Name: "Connect Sidecar Aliasing web1",
|
||||||
|
AliasService: "web1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "invalid check type",
|
name: "invalid check type",
|
||||||
sd: &structs.ServiceDefinition{
|
sd: &structs.ServiceDefinition{
|
||||||
|
|
|
@ -64,6 +64,11 @@ type ServiceQuery struct {
|
||||||
// service entry to be returned.
|
// service entry to be returned.
|
||||||
NodeMeta map[string]string
|
NodeMeta map[string]string
|
||||||
|
|
||||||
|
// ServiceMeta is a map of required service metadata fields. If a key/value
|
||||||
|
// pair is in this map it must be present on the node in order for the
|
||||||
|
// service entry to be returned.
|
||||||
|
ServiceMeta map[string]string
|
||||||
|
|
||||||
// Connect if true will filter the prepared query results to only
|
// Connect if true will filter the prepared query results to only
|
||||||
// include Connect-capable services. These include both native services
|
// include Connect-capable services. These include both native services
|
||||||
// and proxies for matching services. Note that if a proxy matches,
|
// and proxies for matching services. Note that if a proxy matches,
|
||||||
|
|
|
@ -55,6 +55,11 @@ type ServiceQuery struct {
|
||||||
// service entry to be returned.
|
// service entry to be returned.
|
||||||
NodeMeta map[string]string
|
NodeMeta map[string]string
|
||||||
|
|
||||||
|
// ServiceMeta is a map of required service metadata fields. If a key/value
|
||||||
|
// pair is in this map it must be present on the node in order for the
|
||||||
|
// service entry to be returned.
|
||||||
|
ServiceMeta map[string]string
|
||||||
|
|
||||||
// Connect if true will filter the prepared query results to only
|
// Connect if true will filter the prepared query results to only
|
||||||
// include Connect-capable services. These include both native services
|
// include Connect-capable services. These include both native services
|
||||||
// and proxies for matching services. Note that if a proxy matches,
|
// and proxies for matching services. Note that if a proxy matches,
|
||||||
|
|
|
@ -25,6 +25,7 @@ func TestAPI_PreparedQuery(t *testing.T) {
|
||||||
ID: "redis1",
|
ID: "redis1",
|
||||||
Service: "redis",
|
Service: "redis",
|
||||||
Tags: []string{"master", "v1"},
|
Tags: []string{"master", "v1"},
|
||||||
|
Meta: map[string]string{"redis-version": "4.0"},
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -45,6 +46,7 @@ func TestAPI_PreparedQuery(t *testing.T) {
|
||||||
Service: ServiceQuery{
|
Service: ServiceQuery{
|
||||||
Service: "redis",
|
Service: "redis",
|
||||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||||
|
ServiceMeta: map[string]string{"redis-version": "4.0"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -234,6 +234,10 @@ The table below shows this endpoint's support for
|
||||||
key/value pairs that will be used for filtering the query results to nodes
|
key/value pairs that will be used for filtering the query results to nodes
|
||||||
with the given metadata values present.
|
with the given metadata values present.
|
||||||
|
|
||||||
|
- `ServiceMeta` `(map<string|string>: nil)` - Specifies a list of user-defined
|
||||||
|
key/value pairs that will be used for filtering the query results to services
|
||||||
|
with the given metadata values present.
|
||||||
|
|
||||||
- `Connect` `(bool: false)` - If true, only [Connect-capable](/docs/connect/index.html) services
|
- `Connect` `(bool: false)` - If true, only [Connect-capable](/docs/connect/index.html) services
|
||||||
for the specified service name will be returned. This includes both
|
for the specified service name will be returned. This includes both
|
||||||
natively integrated services and proxies. For proxies, the proxy name
|
natively integrated services and proxies. For proxies, the proxy name
|
||||||
|
@ -263,7 +267,8 @@ The table below shows this endpoint's support for
|
||||||
"Near": "node1",
|
"Near": "node1",
|
||||||
"OnlyPassing": false,
|
"OnlyPassing": false,
|
||||||
"Tags": ["primary", "!experimental"],
|
"Tags": ["primary", "!experimental"],
|
||||||
"NodeMeta": {"instance_type": "m3.large"}
|
"NodeMeta": {"instance_type": "m3.large"},
|
||||||
|
"ServiceMeta": {"environment": "production"}
|
||||||
},
|
},
|
||||||
"DNS": {
|
"DNS": {
|
||||||
"TTL": "10s"
|
"TTL": "10s"
|
||||||
|
@ -336,7 +341,8 @@ $ curl \
|
||||||
},
|
},
|
||||||
"OnlyPassing": false,
|
"OnlyPassing": false,
|
||||||
"Tags": ["primary", "!experimental"],
|
"Tags": ["primary", "!experimental"],
|
||||||
"NodeMeta": {"instance_type": "m3.large"}
|
"NodeMeta": {"instance_type": "m3.large"},
|
||||||
|
"ServiceMeta": {"environment": "production"}
|
||||||
},
|
},
|
||||||
"DNS": {
|
"DNS": {
|
||||||
"TTL": "10s"
|
"TTL": "10s"
|
||||||
|
|
|
@ -122,6 +122,8 @@ proxy.
|
||||||
be overridden as it is used to [manage the lifecycle](#lifecycle) of the
|
be overridden as it is used to [manage the lifecycle](#lifecycle) of the
|
||||||
registration.
|
registration.
|
||||||
- `name` - Defaults to being `<parent-service-name>-sidecar-proxy`.
|
- `name` - Defaults to being `<parent-service-name>-sidecar-proxy`.
|
||||||
|
- `tags` - Defaults to the tags of the parent service.
|
||||||
|
- `meta` - Defaults to the service metadata of the parent service.
|
||||||
- `port` - Defaults to being auto-assigned from a [configurable
|
- `port` - Defaults to being auto-assigned from a [configurable
|
||||||
range](/docs/agent/options.html#sidecar_min_port) that is
|
range](/docs/agent/options.html#sidecar_min_port) that is
|
||||||
by default `[21000, 21255]`.
|
by default `[21000, 21255]`.
|
||||||
|
|
Loading…
Reference in New Issue