diff --git a/consul/prepared_query_endpoint_test.go b/consul/prepared_query_endpoint_test.go index 881715c46b..59a7dde6da 100644 --- a/consul/prepared_query_endpoint_test.go +++ b/consul/prepared_query_endpoint_test.go @@ -7,10 +7,12 @@ import ( "reflect" "strings" "testing" + "time" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/serf/coordinate" ) func TestPreparedQuery_Apply(t *testing.T) { @@ -870,12 +872,9 @@ func TestPreparedQuery_List(t *testing.T) { }, WriteRequest: structs.WriteRequest{Token: "root"}, } - var reply string - - if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &reply); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil { t.Fatalf("err: %v", err) } - token = reply } // Set up a node and service in the catalog. @@ -993,6 +992,541 @@ func TestPreparedQuery_List(t *testing.T) { } } +// This is a beast of a test, but the setup is so extensive it makes sense to +// walk through the different cases once we have it up. This is broken into +// sections so it's still pretty easy to read. +func TestPreparedQuery_Execute(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec1 := rpcClient(t, s1) + defer codec1.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + codec2 := rpcClient(t, s2) + defer codec2.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s2.RPC, "dc2") + + // Try to WAN join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult( + func() (bool, error) { + return len(s1.WANMembers()) > 1, nil + }, + func(err error) { + t.Fatalf("Failed waiting for WAN join: %v", err) + }) + + // Create an ACL with read permission to the service. + var token string + { + var rules = ` + service "foo" { + policy = "read" + } + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec1, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Set up some nodes in each DC that host the service. + { + for i := 0; i < 10; i++ { + for _, dc := range []string{"dc1", "dc2"} { + req := structs.RegisterRequest{ + Datacenter: dc, + Node: fmt.Sprintf("node%d", i+1), + Address: fmt.Sprintf("127.0.0.%d", i+1), + Service: &structs.NodeService{ + Service: "foo", + Port: 8000, + Tags: []string{dc, fmt.Sprintf("tag%d", i+1)}, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + + var codec rpc.ClientCodec + if dc == "dc1" { + codec = codec1 + } else { + codec = codec2 + } + + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + } + } + + // Set up a service query. + query := structs.PreparedQueryRequest{ + Datacenter: "dc1", + Op: structs.PreparedQueryCreate, + Query: &structs.PreparedQuery{ + Service: structs.ServiceQuery{ + Service: "foo", + }, + DNS: structs.QueryDNSOptions{ + TTL: "10s", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + } + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { + t.Fatalf("err: %v", err) + } + + // Run a query that doesn't exist. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: "nope", + } + + var reply structs.PreparedQueryExecuteResponse + err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply) + if err == nil || !strings.Contains(err.Error(), ErrQueryNotFound.Error()) { + t.Fatalf("bad: %v", err) + } + + if len(reply.Nodes) != 0 { + t.Fatalf("bad: %v", reply) + } + } + + // Run the registered query. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 10 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + } + + // Try with a limit. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + Limit: 3, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + } + + // Push a coordinate for one of the nodes so we can try an RTT sort. We + // have to sleep a little while for the coordinate batch to get flushed. + { + req := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node3", + Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec1, "Coordinate.Update", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(2 * s1.config.CoordinateUpdatePeriod) + } + + // Try an RTT sort. We don't have any other coordinates in there but + // showing that the node with a coordinate is always first proves we + // call the RTT sorting function, which is tested elsewhere. + for i := 0; i < 100; i++ { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + Source: structs.QuerySource{ + Datacenter: "dc1", + Node: "node3", + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 10 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + if reply.Nodes[0].Node.Node != "node3" { + t.Fatalf("bad: %v", reply) + } + } + + // Make sure the shuffle looks like it's working. + uniques := make(map[string]struct{}) + for i := 0; i < 100; i++ { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 10 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + var names []string + for _, node := range reply.Nodes { + names = append(names, node.Node.Node) + } + key := strings.Join(names, "|") + uniques[key] = struct{}{} + } + + // We have to allow for the fact that there won't always be a unique + // shuffle each pass, so we just look for smell here without the test + // being flaky. + if len(uniques) < 50 { + t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques)) + } + + // Update the health of a node to mark it critical. + setHealth := func(node string, health string) { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + Service: &structs.NodeService{ + Service: "foo", + Port: 8000, + Tags: []string{"dc1", "tag1"}, + }, + Check: &structs.HealthCheck{ + Name: "failing", + Status: health, + ServiceID: "foo", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec1, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + setHealth("node1", structs.HealthCritical) + + // The failing node should be filtered. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 9 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + for _, node := range reply.Nodes { + if node.Node.Node == "node1" { + t.Fatalf("bad: %v", node) + } + } + } + + // Upgrade it to a warning and re-query, should be 10 nodes again. + setHealth("node1", structs.HealthWarning) + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 10 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + } + + // Make the query more picky so it excludes warning nodes. + query.Op = structs.PreparedQueryUpdate + query.Query.Service.OnlyPassing = true + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { + t.Fatalf("err: %v", err) + } + + // The node in the warning state should be filtered. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 9 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + for _, node := range reply.Nodes { + if node.Node.Node == "node1" { + t.Fatalf("bad: %v", node) + } + } + } + + // Make the query more picky by adding a tag filter. This just proves we + // call into the tag filter, it is tested more thoroughly in a separate + // test. + query.Query.Service.Tags = []string{"!tag3"} + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { + t.Fatalf("err: %v", err) + } + + // The node in the warning state should be filtered as well as the node + // with the filtered tag. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 8 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + for _, node := range reply.Nodes { + if node.Node.Node == "node1" || node.Node.Node == "node3" { + t.Fatalf("bad: %v", node) + } + } + } + + // Now fail everything in dc1 and we should get an empty list back. + for i := 0; i < 10; i++ { + setHealth(fmt.Sprintf("node%d", i+1), structs.HealthCritical) + } + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 0 || + reply.Datacenter != "dc1" || reply.Failovers != 0 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + } + + // Modify the query to have it fail over to a bogus DC and then dc2. + query.Query.Service.Failover.Datacenters = []string{"bogus", "dc2"} + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { + t.Fatalf("err: %v", err) + } + + // Now we should see 9 nodes from dc2 (we have the tag filter still). + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 9 || + reply.Datacenter != "dc2" || reply.Failovers != 1 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + for _, node := range reply.Nodes { + if node.Node.Node == "node3" { + t.Fatalf("bad: %v", node) + } + } + } + + // Make sure the limit and query options are forwarded. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + Limit: 3, + QueryOptions: structs.QueryOptions{RequireConsistent: true}, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc2" || reply.Failovers != 1 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + for _, node := range reply.Nodes { + if node.Node.Node == "node3" { + t.Fatalf("bad: %v", node) + } + } + } + + // Make sure the remote shuffle looks like it's working. + uniques = make(map[string]struct{}) + for i := 0; i < 100; i++ { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + + if len(reply.Nodes) != 9 || + reply.Datacenter != "dc2" || reply.Failovers != 1 || + !reflect.DeepEqual(reply.DNS, query.Query.DNS) { + t.Fatalf("bad: %v", reply) + } + var names []string + for _, node := range reply.Nodes { + names = append(names, node.Node.Node) + } + key := strings.Join(names, "|") + uniques[key] = struct{}{} + } + + // We have to allow for the fact that there won't always be a unique + // shuffle each pass, so we just look for smell here without the test + // being flaky. + if len(uniques) < 50 { + t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques)) + } + + // Finally, take away the token's ability to read the service. + { + var rules = ` + service "foo" { + policy = "deny" + } + ` + + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + ID: token, + Name: "User token", + Type: structs.ACLTypeClient, + Rules: rules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec1, "ACL.Apply", &req, &token); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Now the query should be denied. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("bad: %v", err) + } + + if len(reply.Nodes) != 0 { + t.Fatalf("bad: %v", reply) + } + } +} + func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1)