Adds execute leader forward test for prepared queries.

This commit is contained in:
James Phillips 2015-11-10 15:16:41 -08:00
parent 7ded6c7a4a
commit a9e9d5e311

View File

@ -992,3 +992,134 @@ func TestPreparedQuery_List(t *testing.T) {
}
}
}
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()
// Try to join.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc1")
// Use the follower as the client.
var codec rpc.ClientCodec
if !s1.IsLeader() {
codec = codec1
} else {
codec = codec2
}
// Set up a node and service in the catalog.
{
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "redis",
Tags: []string{"master"},
Port: 8000,
},
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a bare bones query.
query := structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Service: structs.ServiceQuery{
Service: "redis",
},
},
}
var reply string
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply); err != nil {
t.Fatalf("err: %v", err)
}
// Execute it through the follower.
{
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: reply,
}
var reply structs.PreparedQueryExecuteResponse
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if len(reply.Nodes) != 1 {
t.Fatalf("bad: %v", reply)
}
}
// Execute it through the follower with consistency turned on.
{
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: reply,
QueryOptions: structs.QueryOptions{RequireConsistent: true},
}
var reply structs.PreparedQueryExecuteResponse
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if len(reply.Nodes) != 1 {
t.Fatalf("bad: %v", reply)
}
}
// Remote execute it through the follower.
{
req := structs.PreparedQueryExecuteRemoteRequest{
Datacenter: "dc1",
Query: *query.Query,
}
var reply structs.PreparedQueryExecuteResponse
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.ExecuteRemote", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if len(reply.Nodes) != 1 {
t.Fatalf("bad: %v", reply)
}
}
// Remote execute it through the follower with consistency turned on.
{
req := structs.PreparedQueryExecuteRemoteRequest{
Datacenter: "dc1",
Query: *query.Query,
QueryOptions: structs.QueryOptions{RequireConsistent: true},
}
var reply structs.PreparedQueryExecuteResponse
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.ExecuteRemote", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if len(reply.Nodes) != 1 {
t.Fatalf("bad: %v", reply)
}
}
}