Adds a leader forwarding case for prepared queries.

This commit is contained in:
James Phillips 2015-11-10 14:46:08 -08:00
parent fa414a2092
commit 7ded6c7a4a

View File

@ -1,6 +1,8 @@
package consul
import (
"fmt"
"net/rpc"
"os"
"reflect"
"strings"
@ -494,6 +496,74 @@ func TestPreparedQuery_Apply_ACLDeny(t *testing.T) {
}
}
func TestPreparedQuery_Apply_ForwardLeader(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()
// Try to join.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc1")
// Use the follower as the client.
var codec rpc.ClientCodec
if !s1.IsLeader() {
codec = codec1
} else {
codec = codec2
}
// Set up a node and service in the catalog.
{
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "redis",
Tags: []string{"master"},
Port: 8000,
},
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply)
if err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a bare bones query.
query := structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Service: structs.ServiceQuery{
Service: "redis",
},
},
}
// Make sure the apply works even when forwarded through the non-leader.
var reply string
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &query, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestPreparedQuery_parseQuery(t *testing.T) {
query := &structs.PreparedQuery{}