mirror of https://github.com/status-im/consul.git
First pass at adding node meta filter to prepared queries
This commit is contained in:
parent
945da0395a
commit
4e8c0fca63
|
@ -138,13 +138,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
|||
|
||||
reply.Index, reply.Nodes = index, nodes
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
var filtered structs.CheckServiceNodes
|
||||
for _, node := range nodes {
|
||||
if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) {
|
||||
filtered = append(filtered, node)
|
||||
}
|
||||
}
|
||||
reply.Nodes = filtered
|
||||
reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
|
||||
}
|
||||
if err := h.srv.filterACL(args.Token, reply); err != nil {
|
||||
return err
|
||||
|
|
|
@ -38,6 +38,11 @@ var (
|
|||
"${match(1)}",
|
||||
"${match(2)}",
|
||||
},
|
||||
NodeMeta: map[string]string{
|
||||
"${name.full}": "${name.prefix}",
|
||||
"${name.suffix}": "${match(0)}",
|
||||
"${match(1)}": "${match(2)}",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -222,6 +227,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||
"${match(4)}",
|
||||
"${40 + 2}",
|
||||
},
|
||||
NodeMeta: map[string]string{"${match(1)}": "${match(2)}"},
|
||||
},
|
||||
}
|
||||
ct, err := Compile(query)
|
||||
|
@ -252,6 +258,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||
"",
|
||||
"42",
|
||||
},
|
||||
NodeMeta: map[string]string{"hello": "foo"},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
|
@ -282,6 +289,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||
"",
|
||||
"42",
|
||||
},
|
||||
NodeMeta: map[string]string{"": ""},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
|
|
|
@ -34,6 +34,25 @@ func visit(path string, v reflect.Value, t reflect.Type, fn visitor) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
case reflect.Map:
|
||||
for i, key := range v.MapKeys() {
|
||||
value := v.MapIndex(key)
|
||||
|
||||
newKey := reflect.New(key.Type()).Elem()
|
||||
newKey.SetString(key.String())
|
||||
newValue := reflect.New(value.Type()).Elem()
|
||||
newValue.SetString(value.String())
|
||||
|
||||
if err := visit(fmt.Sprintf("%s.keys[%d]", path, i), newKey, newKey.Type(), fn); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := visit(fmt.Sprintf("%s[%s]", path, key.String()), newValue, newValue.Type(), fn); err != nil {
|
||||
return err
|
||||
}
|
||||
// delete the old entry and add the new one
|
||||
v.SetMapIndex(key, reflect.Value{})
|
||||
v.SetMapIndex(newKey, newValue)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
|||
},
|
||||
Near: "_agent",
|
||||
Tags: []string{"tag1", "tag2", "tag3"},
|
||||
NodeMeta: map[string]string{"role": "server"},
|
||||
}
|
||||
if err := walk(service, fn); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -35,6 +36,8 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
|||
".Tags[0]:tag1",
|
||||
".Tags[1]:tag2",
|
||||
".Tags[2]:tag3",
|
||||
".NodeMeta.keys[0]:role",
|
||||
".NodeMeta[role]:server",
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %#v", actual)
|
||||
|
|
|
@ -492,6 +492,11 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
|||
// Filter out any unhealthy nodes.
|
||||
nodes = nodes.Filter(query.Service.OnlyPassing)
|
||||
|
||||
// Apply the node metadata filters, if any.
|
||||
if len(query.Service.NodeMeta) > 0 {
|
||||
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
|
||||
}
|
||||
|
||||
// Apply the tag filters, if any.
|
||||
if len(query.Service.Tags) > 0 {
|
||||
nodes = tagFilter(query.Service.Tags, nodes)
|
||||
|
@ -562,6 +567,18 @@ func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServ
|
|||
return nodes[:n]
|
||||
}
|
||||
|
||||
// nodeMetaFilter returns a list of the nodes who satisfy the given metadata filters. Nodes
|
||||
// must have ALL the given tags.
|
||||
func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
||||
var filtered structs.CheckServiceNodes
|
||||
for _, node := range nodes {
|
||||
if structs.SatisfiesMetaFilters(node.Node.Meta, filters) {
|
||||
filtered = append(filtered, node)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
// queryServer is a wrapper that makes it easier to test the failover logic.
|
||||
type queryServer interface {
|
||||
GetLogger() *log.Logger
|
||||
|
|
|
@ -1482,6 +1482,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
Datacenter: dc,
|
||||
Node: fmt.Sprintf("node%d", i+1),
|
||||
Address: fmt.Sprintf("127.0.0.%d", i+1),
|
||||
NodeMeta: map[string]string{
|
||||
"group": fmt.Sprintf("%d", i/5),
|
||||
"instance_type": "t2.micro",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "foo",
|
||||
Port: 8000,
|
||||
|
@ -1489,6 +1493,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if i == 0 {
|
||||
req.NodeMeta["unique"] = "true"
|
||||
}
|
||||
|
||||
var codec rpc.ClientCodec
|
||||
if dc == "dc1" {
|
||||
|
@ -1587,6 +1594,73 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Run various service queries with node metadata filters.
|
||||
if false {
|
||||
cases := []struct{
|
||||
filters map[string]string
|
||||
numNodes int
|
||||
}{
|
||||
{
|
||||
filters: map[string]string{},
|
||||
numNodes: 10,
|
||||
},
|
||||
{
|
||||
filters: map[string]string{"instance_type": "t2.micro"},
|
||||
numNodes: 10,
|
||||
},
|
||||
{
|
||||
filters: map[string]string{"group": "1"},
|
||||
numNodes: 5,
|
||||
},
|
||||
{
|
||||
filters: map[string]string{"group": "0", "unique": "true"},
|
||||
numNodes: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
nodeMetaQuery := structs.PreparedQueryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.PreparedQueryCreate,
|
||||
Query: &structs.PreparedQuery{
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "foo",
|
||||
NodeMeta: tc.filters,
|
||||
},
|
||||
DNS: structs.QueryDNSOptions{
|
||||
TTL: "10s",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: nodeMetaQuery.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(reply.Nodes) != tc.numNodes {
|
||||
t.Fatalf("bad: %v, %v", len(reply.Nodes), tc.numNodes)
|
||||
}
|
||||
|
||||
for _, node := range reply.Nodes {
|
||||
if !structs.SatisfiesMetaFilters(node.Node.Meta, tc.filters) {
|
||||
t.Fatalf("bad: %v", node.Node.Meta)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Push a coordinate for one of the nodes so we can try an RTT sort. We
|
||||
// have to sleep a little while for the coordinate batch to get flushed.
|
||||
{
|
||||
|
|
|
@ -44,6 +44,11 @@ type ServiceQuery struct {
|
|||
// this list it must be present. If the tag is preceded with "!" then
|
||||
// it is disallowed.
|
||||
Tags []string
|
||||
|
||||
// NodeMeta is a map of required node metadata fields. If a key/value
|
||||
// pair is in this map it must be present on the node in order for the
|
||||
// service entry to be returned.
|
||||
NodeMeta map[string]string
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue