mirror of https://github.com/status-im/consul.git
Adds execute tests for prepared queries.
This commit is contained in:
parent
30a18220af
commit
14170535e7
|
@ -7,10 +7,12 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/testutil"
|
"github.com/hashicorp/consul/testutil"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
|
"github.com/hashicorp/serf/coordinate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPreparedQuery_Apply(t *testing.T) {
|
func TestPreparedQuery_Apply(t *testing.T) {
|
||||||
|
@ -870,12 +872,9 @@ func TestPreparedQuery_List(t *testing.T) {
|
||||||
},
|
},
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
}
|
}
|
||||||
var reply string
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
||||||
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &reply); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
token = reply
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up a node and service in the catalog.
|
// Set up a node and service in the catalog.
|
||||||
|
@ -993,6 +992,541 @@ func TestPreparedQuery_List(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is a beast of a test, but the setup is so extensive it makes sense to
|
||||||
|
// walk through the different cases once we have it up. This is broken into
|
||||||
|
// sections so it's still pretty easy to read.
|
||||||
|
func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.ACLDatacenter = "dc1"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec1 := rpcClient(t, s1)
|
||||||
|
defer codec1.Close()
|
||||||
|
|
||||||
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.Datacenter = "dc2"
|
||||||
|
c.ACLDatacenter = "dc1"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
codec2 := rpcClient(t, s2)
|
||||||
|
defer codec2.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
testutil.WaitForLeader(t, s2.RPC, "dc2")
|
||||||
|
|
||||||
|
// Try to WAN join.
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := s2.JoinWAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
testutil.WaitForResult(
|
||||||
|
func() (bool, error) {
|
||||||
|
return len(s1.WANMembers()) > 1, nil
|
||||||
|
},
|
||||||
|
func(err error) {
|
||||||
|
t.Fatalf("Failed waiting for WAN join: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create an ACL with read permission to the service.
|
||||||
|
var token string
|
||||||
|
{
|
||||||
|
var rules = `
|
||||||
|
service "foo" {
|
||||||
|
policy = "read"
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
req := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: rules,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "ACL.Apply", &req, &token); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up some nodes in each DC that host the service.
|
||||||
|
{
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
for _, dc := range []string{"dc1", "dc2"} {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: dc,
|
||||||
|
Node: fmt.Sprintf("node%d", i+1),
|
||||||
|
Address: fmt.Sprintf("127.0.0.%d", i+1),
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "foo",
|
||||||
|
Port: 8000,
|
||||||
|
Tags: []string{dc, fmt.Sprintf("tag%d", i+1)},
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
|
||||||
|
var codec rpc.ClientCodec
|
||||||
|
if dc == "dc1" {
|
||||||
|
codec = codec1
|
||||||
|
} else {
|
||||||
|
codec = codec2
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up a service query.
|
||||||
|
query := structs.PreparedQueryRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.PreparedQueryCreate,
|
||||||
|
Query: &structs.PreparedQuery{
|
||||||
|
Service: structs.ServiceQuery{
|
||||||
|
Service: "foo",
|
||||||
|
},
|
||||||
|
DNS: structs.QueryDNSOptions{
|
||||||
|
TTL: "10s",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: token},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run a query that doesn't exist.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: "nope",
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), ErrQueryNotFound.Error()) {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 0 {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the registered query.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 10 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try with a limit.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
Limit: 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 3 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push a coordinate for one of the nodes so we can try an RTT sort. We
|
||||||
|
// have to sleep a little while for the coordinate batch to get flushed.
|
||||||
|
{
|
||||||
|
req := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node3",
|
||||||
|
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "Coordinate.Update", &req, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try an RTT sort. We don't have any other coordinates in there but
|
||||||
|
// showing that the node with a coordinate is always first proves we
|
||||||
|
// call the RTT sorting function, which is tested elsewhere.
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
Source: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node3",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 10 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
if reply.Nodes[0].Node.Node != "node3" {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the shuffle looks like it's working.
|
||||||
|
uniques := make(map[string]struct{})
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 10 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
var names []string
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
names = append(names, node.Node.Node)
|
||||||
|
}
|
||||||
|
key := strings.Join(names, "|")
|
||||||
|
uniques[key] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have to allow for the fact that there won't always be a unique
|
||||||
|
// shuffle each pass, so we just look for smell here without the test
|
||||||
|
// being flaky.
|
||||||
|
if len(uniques) < 50 {
|
||||||
|
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the health of a node to mark it critical.
|
||||||
|
setHealth := func(node string, health string) {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: node,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "foo",
|
||||||
|
Port: 8000,
|
||||||
|
Tags: []string{"dc1", "tag1"},
|
||||||
|
},
|
||||||
|
Check: &structs.HealthCheck{
|
||||||
|
Name: "failing",
|
||||||
|
Status: health,
|
||||||
|
ServiceID: "foo",
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
var reply struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "Catalog.Register", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
setHealth("node1", structs.HealthCritical)
|
||||||
|
|
||||||
|
// The failing node should be filtered.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 9 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
if node.Node.Node == "node1" {
|
||||||
|
t.Fatalf("bad: %v", node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upgrade it to a warning and re-query, should be 10 nodes again.
|
||||||
|
setHealth("node1", structs.HealthWarning)
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 10 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the query more picky so it excludes warning nodes.
|
||||||
|
query.Op = structs.PreparedQueryUpdate
|
||||||
|
query.Query.Service.OnlyPassing = true
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The node in the warning state should be filtered.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 9 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
if node.Node.Node == "node1" {
|
||||||
|
t.Fatalf("bad: %v", node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the query more picky by adding a tag filter. This just proves we
|
||||||
|
// call into the tag filter, it is tested more thoroughly in a separate
|
||||||
|
// test.
|
||||||
|
query.Query.Service.Tags = []string{"!tag3"}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The node in the warning state should be filtered as well as the node
|
||||||
|
// with the filtered tag.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 8 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
if node.Node.Node == "node1" || node.Node.Node == "node3" {
|
||||||
|
t.Fatalf("bad: %v", node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now fail everything in dc1 and we should get an empty list back.
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
setHealth(fmt.Sprintf("node%d", i+1), structs.HealthCritical)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 0 ||
|
||||||
|
reply.Datacenter != "dc1" || reply.Failovers != 0 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Modify the query to have it fail over to a bogus DC and then dc2.
|
||||||
|
query.Query.Service.Failover.Datacenters = []string{"bogus", "dc2"}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we should see 9 nodes from dc2 (we have the tag filter still).
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 9 ||
|
||||||
|
reply.Datacenter != "dc2" || reply.Failovers != 1 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
if node.Node.Node == "node3" {
|
||||||
|
t.Fatalf("bad: %v", node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the limit and query options are forwarded.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
Limit: 3,
|
||||||
|
QueryOptions: structs.QueryOptions{RequireConsistent: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 3 ||
|
||||||
|
reply.Datacenter != "dc2" || reply.Failovers != 1 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
if node.Node.Node == "node3" {
|
||||||
|
t.Fatalf("bad: %v", node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the remote shuffle looks like it's working.
|
||||||
|
uniques = make(map[string]struct{})
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 9 ||
|
||||||
|
reply.Datacenter != "dc2" || reply.Failovers != 1 ||
|
||||||
|
!reflect.DeepEqual(reply.DNS, query.Query.DNS) {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
var names []string
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
names = append(names, node.Node.Node)
|
||||||
|
}
|
||||||
|
key := strings.Join(names, "|")
|
||||||
|
uniques[key] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have to allow for the fact that there won't always be a unique
|
||||||
|
// shuffle each pass, so we just look for smell here without the test
|
||||||
|
// being flaky.
|
||||||
|
if len(uniques) < 50 {
|
||||||
|
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, take away the token's ability to read the service.
|
||||||
|
{
|
||||||
|
var rules = `
|
||||||
|
service "foo" {
|
||||||
|
policy = "deny"
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
req := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
ID: token,
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: rules,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "ACL.Apply", &req, &token); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now the query should be denied.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != 0 {
|
||||||
|
t.Fatalf("bad: %v", reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
|
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
|
|
Loading…
Reference in New Issue