Merge pull request #1307 from hashicorp/f-rpc

Switch to net-rpc-msgpackrpc
This commit is contained in:
Ryan Uber 2015-10-15 14:57:54 -07:00
commit e115ada1e6
14 changed files with 404 additions and 397 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestACLEndpoint_Apply(t *testing.T) {
@ -17,10 +18,10 @@ func TestACLEndpoint_Apply(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -32,7 +33,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -56,7 +57,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
// Do a delete
arg.Op = structs.ACLDelete
arg.ACL.ID = out
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -77,10 +78,10 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -92,7 +93,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -112,7 +113,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
// Do an update
arg.ACL.ID = out
arg.ACL.Rules = `{"key": {"": {"policy": "deny"}}}`
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -134,7 +135,7 @@ func TestACLEndpoint_Update_PurgeCache(t *testing.T) {
// Do a delete
arg.Op = structs.ACLDelete
arg.ACL.Rules = ""
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -155,10 +156,10 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -171,7 +172,7 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out != "foobarbaz" {
@ -201,10 +202,10 @@ func TestACLEndpoint_Apply_Denied(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -215,7 +216,7 @@ func TestACLEndpoint_Apply_Denied(t *testing.T) {
},
}
var out string
err := client.Call("ACL.Apply", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
@ -228,10 +229,10 @@ func TestACLEndpoint_Apply_DeleteAnon(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -244,7 +245,7 @@ func TestACLEndpoint_Apply_DeleteAnon(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
err := client.Call("ACL.Apply", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out)
if err == nil || !strings.Contains(err.Error(), "delete anonymous") {
t.Fatalf("err: %v", err)
}
@ -257,10 +258,10 @@ func TestACLEndpoint_Apply_RootChange(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -273,7 +274,7 @@ func TestACLEndpoint_Apply_RootChange(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
err := client.Call("ACL.Apply", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out)
if err == nil || !strings.Contains(err.Error(), "root ACL") {
t.Fatalf("err: %v", err)
}
@ -286,10 +287,10 @@ func TestACLEndpoint_Get(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -301,7 +302,7 @@ func TestACLEndpoint_Get(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -310,7 +311,7 @@ func TestACLEndpoint_Get(t *testing.T) {
ACL: out,
}
var acls structs.IndexedACLs
if err := client.Call("ACL.Get", &getR, &acls); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Get", &getR, &acls); err != nil {
t.Fatalf("err: %v", err)
}
@ -333,10 +334,10 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
@ -348,7 +349,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -357,7 +358,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
ACL: out,
}
var acls structs.ACLPolicy
if err := client.Call("ACL.GetPolicy", &getR, &acls); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &acls); err != nil {
t.Fatalf("err: %v", err)
}
@ -371,7 +372,7 @@ func TestACLEndpoint_GetPolicy(t *testing.T) {
// Do a conditional lookup with etag
getR.ETag = acls.ETag
var out2 structs.ACLPolicy
if err := client.Call("ACL.GetPolicy", &getR, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.GetPolicy", &getR, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -390,10 +391,10 @@ func TestACLEndpoint_List(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
ids := []string{}
for i := 0; i < 5; i++ {
@ -407,7 +408,7 @@ func TestACLEndpoint_List(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
ids = append(ids, out)
@ -418,7 +419,7 @@ func TestACLEndpoint_List(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: "root"},
}
var acls structs.IndexedACLs
if err := client.Call("ACL.List", &getR, &acls); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.List", &getR, &acls); err != nil {
t.Fatalf("err: %v", err)
}
@ -450,16 +451,16 @@ func TestACLEndpoint_List_Denied(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
getR := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var acls structs.IndexedACLs
err := client.Call("ACL.List", &getR, &acls)
err := msgpackrpc.CallWithCodec(codec, "ACL.List", &getR, &acls)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}

View File

@ -18,7 +18,7 @@ func TestACL_Disabled(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
acl, err := s1.resolveToken("does not exist")
if err != nil {
@ -62,7 +62,7 @@ func TestACL_Authority_NotFound(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
acl, err := s1.resolveToken("does not exist")
if err == nil || err.Error() != aclNotFound {
@ -83,7 +83,7 @@ func TestACL_Authority_Found(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create a new token
arg := structs.ACLRequest{
@ -97,7 +97,7 @@ func TestACL_Authority_Found(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
@ -128,7 +128,7 @@ func TestACL_Authority_Anonymous_Found(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Resolve the token
acl, err := s1.resolveToken("")
@ -155,7 +155,7 @@ func TestACL_Authority_Master_Found(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Resolve the token
acl, err := s1.resolveToken("foobar")
@ -183,7 +183,7 @@ func TestACL_Authority_Management(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Resolve the token
acl, err := s1.resolveToken("foobar")
@ -230,7 +230,7 @@ func TestACL_NonAuthority_NotFound(t *testing.T) {
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// find the non-authoritative server
var nonAuth *Server
@ -279,7 +279,7 @@ func TestACL_NonAuthority_Found(t *testing.T) {
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create a new token
arg := structs.ACLRequest{
@ -293,7 +293,7 @@ func TestACL_NonAuthority_Found(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
@ -355,7 +355,7 @@ func TestACL_NonAuthority_Management(t *testing.T) {
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// find the non-authoritative server
var nonAuth *Server
@ -412,7 +412,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create a new token
arg := structs.ACLRequest{
@ -426,7 +426,7 @@ func TestACL_DownPolicy_Deny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
@ -486,7 +486,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create a new token
arg := structs.ACLRequest{
@ -500,7 +500,7 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
@ -562,7 +562,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create a new token
arg := structs.ACLRequest{
@ -576,7 +576,7 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
@ -637,8 +637,8 @@ func TestACL_MultiDC_Found(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, client.Call, "dc2")
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc2")
// Create a new token
arg := structs.ACLRequest{
@ -652,7 +652,7 @@ func TestACL_MultiDC_Found(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := client.Call("ACL.Apply", &arg, &id); err != nil {
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -11,14 +11,15 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestCatalogRegister(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -32,13 +33,13 @@ func TestCatalogRegister(t *testing.T) {
}
var out struct{}
err := client.Call("Catalog.Register", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
err := client.Call("Catalog.Register", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
@ -53,10 +54,10 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL
arg := structs.ACLRequest{
@ -70,7 +71,7 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -88,13 +89,13 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
}
var outR struct{}
err := client.Call("Catalog.Register", &argR, &outR)
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
argR.Service.Service = "foo"
err = client.Call("Catalog.Register", &argR, &outR)
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -104,14 +105,14 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client1 := rpcClient(t, s1)
defer client1.Close()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
client2 := rpcClient(t, s2)
defer client2.Close()
codec2 := rpcClient(t, s2)
defer codec2.Close()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
@ -120,15 +121,15 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client1.Call, "dc1")
testutil.WaitForLeader(t, client2.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc1")
// Use the follower as the client
var client *rpc.Client
var codec rpc.ClientCodec
if !s1.IsLeader() {
client = client1
codec = codec1
} else {
client = client2
codec = codec2
}
arg := structs.RegisterRequest{
@ -142,7 +143,7 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -151,8 +152,8 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
@ -165,7 +166,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc2")
testutil.WaitForLeader(t, s1.RPC, "dc2")
arg := structs.RegisterRequest{
Datacenter: "dc2", // Should forward through s1
@ -178,7 +179,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -187,8 +188,8 @@ func TestCatalogDeregister(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
arg := structs.DeregisterRequest{
Datacenter: "dc1",
@ -196,14 +197,14 @@ func TestCatalogDeregister(t *testing.T) {
}
var out struct{}
err := client.Call("Catalog.Deregister", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.Deregister", &arg, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
if err := client.Call("Catalog.Deregister", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Deregister", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -212,8 +213,8 @@ func TestCatalogListDatacenters(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
@ -226,10 +227,10 @@ func TestCatalogListDatacenters(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
var out []string
if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListDatacenters", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -251,25 +252,25 @@ func TestCatalogListNodes(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedNodes
err := client.Call("Catalog.ListNodes", &args, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
testutil.WaitForResult(func() (bool, error) {
client.Call("Catalog.ListNodes", &args, &out)
msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out)
return len(out.Nodes) == 2, nil
}, func(err error) {
t.Fatalf("err: %v", err)
@ -291,14 +292,14 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client1 := rpcClient(t, s1)
defer client1.Close()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
client2 := rpcClient(t, s2)
defer client2.Close()
codec2 := rpcClient(t, s2)
defer codec2.Close()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
@ -307,18 +308,18 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client1.Call, "dc1")
testutil.WaitForLeader(t, client2.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc1")
// Use the follower as the client
var client *rpc.Client
var codec rpc.ClientCodec
if !s1.IsLeader() {
client = client1
codec = codec1
// Inject fake data on the follower!
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
} else {
client = client2
codec = codec2
// Inject fake data on the follower!
s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
@ -329,7 +330,7 @@ func TestCatalogListNodes_StaleRaad(t *testing.T) {
QueryOptions: structs.QueryOptions{AllowStale: true},
}
var out structs.IndexedNodes
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -355,14 +356,14 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client1 := rpcClient(t, s1)
defer client1.Close()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
client2 := rpcClient(t, s2)
defer client2.Close()
codec2 := rpcClient(t, s2)
defer codec2.Close()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
@ -371,16 +372,16 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client1.Call, "dc1")
testutil.WaitForLeader(t, client2.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc1")
// Use the leader as the client, kill the follower
var client *rpc.Client
var codec rpc.ClientCodec
if s1.IsLeader() {
client = client1
codec = codec1
s2.Shutdown()
} else {
client = client2
codec = codec2
s1.Shutdown()
}
@ -389,7 +390,7 @@ func TestCatalogListNodes_ConsistentRead_Fail(t *testing.T) {
QueryOptions: structs.QueryOptions{RequireConsistent: true},
}
var out structs.IndexedNodes
if err := client.Call("Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); !strings.HasPrefix(err.Error(), "leadership lost") {
t.Fatalf("err: %v", err)
}
@ -405,14 +406,14 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client1 := rpcClient(t, s1)
defer client1.Close()
codec1 := rpcClient(t, s1)
defer codec1.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
client2 := rpcClient(t, s2)
defer client2.Close()
codec2 := rpcClient(t, s2)
defer codec2.Close()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
@ -421,15 +422,15 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client1.Call, "dc1")
testutil.WaitForLeader(t, client2.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s2.RPC, "dc1")
// Use the leader as the client, kill the follower
var client *rpc.Client
var codec rpc.ClientCodec
if s1.IsLeader() {
client = client1
codec = codec1
} else {
client = client2
codec = codec2
}
args := structs.DCSpecificRequest{
@ -437,7 +438,7 @@ func TestCatalogListNodes_ConsistentRead(t *testing.T) {
QueryOptions: structs.QueryOptions{RequireConsistent: true},
}
var out structs.IndexedNodes
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -453,8 +454,8 @@ func BenchmarkCatalogListNodes(t *testing.B) {
dir1, s1 := testServer(nil)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(nil, s1)
defer client.Close()
codec := rpcClient(nil, s1)
defer codec.Close()
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
@ -464,7 +465,7 @@ func BenchmarkCatalogListNodes(t *testing.B) {
}
for i := 0; i < t.N; i++ {
var out structs.IndexedNodes
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -474,25 +475,25 @@ func TestCatalogListServices(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedServices
err := client.Call("Catalog.ListServices", &args, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -520,18 +521,18 @@ func TestCatalogListServices_Blocking(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedServices
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Run the query
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -549,7 +550,7 @@ func TestCatalogListServices_Blocking(t *testing.T) {
// Re-run the query
out = structs.IndexedServices{}
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -573,18 +574,18 @@ func TestCatalogListServices_Timeout(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedServices
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Run the query
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -595,7 +596,7 @@ func TestCatalogListServices_Timeout(t *testing.T) {
// Re-run the query
start := time.Now()
out = structs.IndexedServices{}
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -614,8 +615,8 @@ func TestCatalogListServices_Stale(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.DCSpecificRequest{
Datacenter: "dc1",
@ -628,7 +629,7 @@ func TestCatalogListServices_Stale(t *testing.T) {
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
// Run the query, do not wait for leader!
if err := client.Call("Catalog.ListServices", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -647,8 +648,8 @@ func TestCatalogListServiceNodes(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
@ -657,18 +658,18 @@ func TestCatalogListServiceNodes(t *testing.T) {
TagFilter: false,
}
var out structs.IndexedServiceNodes
err := client.Call("Catalog.ServiceNodes", &args, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -680,7 +681,7 @@ func TestCatalogListServiceNodes(t *testing.T) {
args.TagFilter = true
out = structs.IndexedServiceNodes{}
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.ServiceNodes) != 0 {
@ -692,27 +693,27 @@ func TestCatalogNodeServices(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "foo",
}
var out structs.IndexedNodeServices
err := client.Call("Catalog.NodeServices", &args, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false})
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", nil, "127.0.0.1", 80, false})
if err := client.Call("Catalog.NodeServices", &args, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -736,8 +737,8 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -751,14 +752,14 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
}
var out struct{}
err := client.Call("Catalog.Register", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err == nil || err.Error() != "No cluster leader" {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -768,7 +769,7 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
ServiceName: "web",
}
var out2 structs.IndexedServiceNodes
if err := client.Call("Catalog.ServiceNodes", query, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", query, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -778,15 +779,15 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
}
}
func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *rpc.Client) {
func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, codec rpc.ClientCodec) {
dir, srv = testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
client = rpcClient(t, srv)
testutil.WaitForLeader(t, client.Call, "dc1")
codec = rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC, "dc1")
// Create a new token
arg := structs.ACLRequest{
@ -799,7 +800,7 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := client.Call("ACL.Apply", &arg, &token); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &token); err != nil {
t.Fatalf("err: %v", err)
}
@ -820,7 +821,7 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := client.Call("Catalog.Register", &regArg, nil); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &regArg, nil); err != nil {
t.Fatalf("err: %s", err)
}
@ -840,24 +841,24 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, client *
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := client.Call("Catalog.Register", &regArg, nil); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &regArg, nil); err != nil {
t.Fatalf("err: %s", err)
}
return
}
func TestCatalog_ListServices_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedServices{}
if err := client.Call("Catalog.ListServices", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := reply.Services["foo"]; !ok {
@ -869,10 +870,10 @@ func TestCatalog_ListServices_FilterACL(t *testing.T) {
}
func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.ServiceSpecificRequest{
Datacenter: "dc1",
@ -880,7 +881,7 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedServiceNodes{}
if err := client.Call("Catalog.ServiceNodes", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
found := false
@ -901,7 +902,7 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply = structs.IndexedServiceNodes{}
if err := client.Call("Catalog.ServiceNodes", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
for _, sn := range reply.ServiceNodes {
@ -912,10 +913,10 @@ func TestCatalog_ServiceNodes_FilterACL(t *testing.T) {
}
func TestCatalog_NodeServices_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.NodeSpecificRequest{
Datacenter: "dc1",
@ -923,7 +924,7 @@ func TestCatalog_NodeServices_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedNodeServices{}
if err := client.Call("Catalog.NodeServices", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.NodeServices", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
found := false

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
)
@ -320,13 +321,13 @@ func TestClientServer_UserEvent(t *testing.T) {
})
// Fire the user event
client := rpcClient(t, s1)
codec := rpcClient(t, s1)
event := structs.EventFireRequest{
Name: "foo",
Datacenter: "dc1",
Payload: []byte("baz"),
}
if err := client.Call("Internal.EventFire", &event, nil); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -14,6 +14,9 @@ import (
"github.com/hashicorp/raft"
)
// msgpackHandle is a shared handle for encoding/decoding msgpack payloads
var msgpackHandle = &codec.MsgpackHandle{}
// consulFSM implements a finite state machine that is used
// along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package.

View File

@ -1,20 +1,22 @@
package consul
import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestHealth_ChecksInState(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -26,7 +28,7 @@ func TestHealth_ChecksInState(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -35,7 +37,7 @@ func TestHealth_ChecksInState(t *testing.T) {
Datacenter: "dc1",
State: structs.HealthPassing,
}
if err := client.Call("Health.ChecksInState", &inState, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -57,10 +59,10 @@ func TestHealth_NodeChecks(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -72,7 +74,7 @@ func TestHealth_NodeChecks(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -81,7 +83,7 @@ func TestHealth_NodeChecks(t *testing.T) {
Datacenter: "dc1",
Node: "foo",
}
if err := client.Call("Health.NodeChecks", &node, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &node, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -98,10 +100,10 @@ func TestHealth_ServiceChecks(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -118,7 +120,7 @@ func TestHealth_ServiceChecks(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -127,7 +129,7 @@ func TestHealth_ServiceChecks(t *testing.T) {
Datacenter: "dc1",
ServiceName: "db",
}
if err := client.Call("Health.ServiceChecks", &node, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &node, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -144,10 +146,10 @@ func TestHealth_ServiceNodes(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -165,7 +167,7 @@ func TestHealth_ServiceNodes(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -184,7 +186,7 @@ func TestHealth_ServiceNodes(t *testing.T) {
ServiceID: "db",
},
}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -195,7 +197,7 @@ func TestHealth_ServiceNodes(t *testing.T) {
ServiceTag: "master",
TagFilter: false,
}
if err := client.Call("Health.ServiceNodes", &req, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -224,10 +226,10 @@ func TestHealth_ServiceNodes(t *testing.T) {
}
func TestHealth_NodeChecks_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.NodeSpecificRequest{
Datacenter: "dc1",
@ -235,7 +237,7 @@ func TestHealth_NodeChecks_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedHealthChecks{}
if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
found := false
@ -253,10 +255,10 @@ func TestHealth_NodeChecks_FilterACL(t *testing.T) {
}
func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.ServiceSpecificRequest{
Datacenter: "dc1",
@ -264,7 +266,7 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedHealthChecks{}
if err := client.Call("Health.ServiceChecks", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
found := false
@ -280,7 +282,7 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
opt.ServiceName = "bar"
reply = structs.IndexedHealthChecks{}
if err := client.Call("Health.ServiceChecks", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
if len(reply.HealthChecks) != 0 {
@ -289,10 +291,10 @@ func TestHealth_ServiceChecks_FilterACL(t *testing.T) {
}
func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.ServiceSpecificRequest{
Datacenter: "dc1",
@ -300,7 +302,7 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedCheckServiceNodes{}
if err := client.Call("Health.ServiceNodes", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
if len(reply.Nodes) != 1 {
@ -309,7 +311,7 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
opt.ServiceName = "bar"
reply = structs.IndexedCheckServiceNodes{}
if err := client.Call("Health.ServiceNodes", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
if len(reply.Nodes) != 0 {
@ -318,10 +320,10 @@ func TestHealth_ServiceNodes_FilterACL(t *testing.T) {
}
func TestHealth_ChecksInState_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.ChecksInStateRequest{
Datacenter: "dc1",
@ -329,7 +331,7 @@ func TestHealth_ChecksInState_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedHealthChecks{}
if err := client.Call("Health.ChecksInState", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -3,20 +3,22 @@ package consul
import (
"encoding/base64"
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestInternal_NodeInfo(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -34,7 +36,7 @@ func TestInternal_NodeInfo(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -43,7 +45,7 @@ func TestInternal_NodeInfo(t *testing.T) {
Datacenter: "dc1",
Node: "foo",
}
if err := client.Call("Internal.NodeInfo", &req, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -66,10 +68,10 @@ func TestInternal_NodeDump(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
@ -87,7 +89,7 @@ func TestInternal_NodeDump(t *testing.T) {
},
}
var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -106,7 +108,7 @@ func TestInternal_NodeDump(t *testing.T) {
ServiceID: "db",
},
}
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -114,7 +116,7 @@ func TestInternal_NodeDump(t *testing.T) {
req := structs.DCSpecificRequest{
Datacenter: "dc1",
}
if err := client.Call("Internal.NodeDump", &req, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -165,17 +167,17 @@ func TestInternal_KeyringOperation(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
var out structs.KeyringResponses
req := structs.KeyringRequest{
Operation: structs.KeyringList,
Datacenter: "dc1",
}
if err := client.Call("Internal.KeyringOperation", &req, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -218,7 +220,7 @@ func TestInternal_KeyringOperation(t *testing.T) {
req2 := structs.KeyringRequest{
Operation: structs.KeyringList,
}
if err := client.Call("Internal.KeyringOperation", &req2, &out2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Internal.KeyringOperation", &req2, &out2); err != nil {
t.Fatalf("err: %v", err)
}
@ -240,10 +242,10 @@ func TestInternal_KeyringOperation(t *testing.T) {
}
func TestInternal_NodeInfo_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.NodeSpecificRequest{
Datacenter: "dc1",
@ -251,7 +253,7 @@ func TestInternal_NodeInfo_FilterACL(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedNodeDump{}
if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
for _, info := range reply.Dump {
@ -284,17 +286,17 @@ func TestInternal_NodeInfo_FilterACL(t *testing.T) {
}
func TestInternal_NodeDump_FilterACL(t *testing.T) {
dir, token, srv, client := testACLFilterServer(t)
dir, token, srv, codec := testACLFilterServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer client.Close()
defer codec.Close()
opt := structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: token},
}
reply := structs.IndexedNodeDump{}
if err := client.Call("Health.NodeChecks", &opt, &reply); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Health.NodeChecks", &opt, &reply); err != nil {
t.Fatalf("err: %s", err)
}
for _, info := range reply.Dump {
@ -336,10 +338,10 @@ func TestInternal_EventFire_Token(t *testing.T) {
defer os.RemoveAll(dir)
defer srv.Shutdown()
client := rpcClient(t, srv)
defer client.Close()
codec := rpcClient(t, srv)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, srv.RPC, "dc1")
// No token is rejected
event := structs.EventFireRequest{
@ -347,14 +349,14 @@ func TestInternal_EventFire_Token(t *testing.T) {
Datacenter: "dc1",
Payload: []byte("nope"),
}
err := client.Call("Internal.EventFire", &event, nil)
err := msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil)
if err == nil || err.Error() != permissionDenied {
t.Fatalf("bad: %s", err)
}
// Root token is allowed to fire
event.Token = "root"
err = client.Call("Internal.EventFire", &event, nil)
err = msgpackrpc.CallWithCodec(codec, "Internal.EventFire", &event, nil)
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -8,16 +8,17 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestKVS_Apply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.KVSRequest{
Datacenter: "dc1",
@ -29,7 +30,7 @@ func TestKVS_Apply(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -47,7 +48,7 @@ func TestKVS_Apply(t *testing.T) {
arg.Op = structs.KVSCAS
arg.DirEnt.ModifyIndex = d.ModifyIndex
arg.DirEnt.Flags = 43
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -74,10 +75,10 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL
arg := structs.ACLRequest{
@ -91,7 +92,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -108,7 +109,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: id},
}
var outR bool
err := client.Call("KVS.Apply", &argR, &outR)
err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &argR, &outR)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
@ -122,7 +123,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
},
WriteRequest: structs.WriteRequest{Token: id},
}
err = client.Call("KVS.Apply", &argR, &outR)
err = msgpackrpc.CallWithCodec(codec, "KVS.Apply", &argR, &outR)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
@ -132,10 +133,10 @@ func TestKVS_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.KVSRequest{
Datacenter: "dc1",
@ -147,7 +148,7 @@ func TestKVS_Get(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -156,7 +157,7 @@ func TestKVS_Get(t *testing.T) {
Key: "test",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.Get", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -183,10 +184,10 @@ func TestKVS_Get_ACLDeny(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.KVSRequest{
Datacenter: "dc1",
@ -199,7 +200,7 @@ func TestKVS_Get_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -208,7 +209,7 @@ func TestKVS_Get_ACLDeny(t *testing.T) {
Key: "zip",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.Get", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -224,10 +225,10 @@ func TestKVSEndpoint_List(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
keys := []string{
"/test/key1",
@ -245,7 +246,7 @@ func TestKVSEndpoint_List(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -255,7 +256,7 @@ func TestKVSEndpoint_List(t *testing.T) {
Key: "/test",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -283,10 +284,10 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
keys := []string{
"/test/key1",
@ -304,7 +305,7 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -314,7 +315,7 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
Key: "/test",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -326,8 +327,8 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSDelete,
@ -336,14 +337,14 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Re-run the query
dirent = structs.IndexedDirEntries{}
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -382,10 +383,10 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
keys := []string{
"abe",
@ -406,7 +407,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -422,7 +423,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -433,7 +434,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: id},
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -462,10 +463,10 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
keys := []string{
"/test/key1",
@ -483,7 +484,7 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -494,7 +495,7 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
Seperator: "/",
}
var dirent structs.IndexedKeyList
if err := client.Call("KVS.ListKeys", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -523,10 +524,10 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
keys := []string{
"abe",
@ -547,7 +548,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
@ -563,7 +564,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out string
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -575,7 +576,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
QueryOptions: structs.QueryOptions{Token: id},
}
var dirent structs.IndexedKeyList
if err := client.Call("KVS.ListKeys", &getR, &dirent); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
@ -597,10 +598,10 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock
state := s1.fsm.State()
@ -643,7 +644,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out != false {
@ -654,7 +655,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
time.Sleep(50 * time.Millisecond)
// Should acquire
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out != true {

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
)
@ -28,8 +29,7 @@ func TestLeader_RegisterMember(t *testing.T) {
t.Fatalf("err: %v", err)
}
client := rpcClient(t, s1)
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Client should be registered
state := s1.fsm.State()
@ -77,8 +77,7 @@ func TestLeader_FailedMember(t *testing.T) {
defer os.RemoveAll(dir2)
defer c1.Shutdown()
client := rpcClient(t, s1)
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
@ -212,8 +211,7 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Register a non-existing member
dead := structs.RegisterRequest{
@ -520,9 +518,9 @@ func TestLeader_ReapTombstones(t *testing.T) {
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
client := rpcClient(t, s1)
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create a KV entry
arg := structs.KVSRequest{
@ -534,13 +532,13 @@ func TestLeader_ReapTombstones(t *testing.T) {
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the KV entry (tombstoned)
arg.Op = structs.KVSDelete
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -11,14 +11,11 @@ import (
"time"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
)
// msgpackHandle is a shared handle for encoding/decoding of RPC messages
var msgpackHandle = &codec.MsgpackHandle{}
// muxSession is used to provide an interface for either muxado or yamux
type muxSession interface {
Open() (net.Conn, error)
@ -40,12 +37,12 @@ func (w *muxadoWrapper) Close() error {
// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream net.Conn
client *rpc.Client
codec rpc.ClientCodec
}
func (sc *StreamClient) Close() {
sc.stream.Close()
sc.client.Close()
sc.codec.Close()
}
// Conn is a pooled connection to a Consul server
@ -88,13 +85,12 @@ func (c *Conn) getClient() (*StreamClient, error) {
}
// Create the RPC client
cc := codec.GoRpc.ClientCodec(stream, msgpackHandle)
client := rpc.NewClientWithCodec(cc)
codec := msgpackrpc.NewClientCodec(stream)
// Return a new stream client
sc := &StreamClient{
stream: stream,
client: client,
codec: codec,
}
return sc, nil
}
@ -390,7 +386,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg
}
// Make the RPC call
err = sc.client.Call(method, args, reply)
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
if err != nil {
sc.Close()
p.releaseConn(conn)

View File

@ -11,7 +11,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
)
@ -158,7 +158,7 @@ func (s *Server) handleMultiplexV2(conn net.Conn) {
// handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close()
rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle)
rpcCodec := msgpackrpc.NewServerCodec(conn)
for {
select {
case <-s.shutdownCh:

View File

@ -7,16 +7,17 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestSessionEndpoint_Apply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
@ -30,7 +31,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -54,7 +55,7 @@ func TestSessionEndpoint_Apply(t *testing.T) {
// Do a delete
arg.Op = structs.SessionDestroy
arg.Session.ID = out
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -72,10 +73,10 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
@ -90,7 +91,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
@ -117,7 +118,7 @@ func TestSessionEndpoint_DeleteApply(t *testing.T) {
// Do a delete
arg.Op = structs.SessionDestroy
arg.Session.ID = out
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -135,10 +136,10 @@ func TestSessionEndpoint_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
arg := structs.SessionRequest{
@ -149,7 +150,7 @@ func TestSessionEndpoint_Get(t *testing.T) {
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -158,7 +159,7 @@ func TestSessionEndpoint_Get(t *testing.T) {
Session: out,
}
var sessions structs.IndexedSessions
if err := client.Call("Session.Get", &getR, &sessions); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Get", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
@ -178,10 +179,10 @@ func TestSessionEndpoint_List(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
ids := []string{}
@ -194,7 +195,7 @@ func TestSessionEndpoint_List(t *testing.T) {
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
ids = append(ids, out)
@ -204,7 +205,7 @@ func TestSessionEndpoint_List(t *testing.T) {
Datacenter: "dc1",
}
var sessions structs.IndexedSessions
if err := client.Call("Session.List", &getR, &sessions); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
@ -229,10 +230,10 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
arg := structs.SessionRequest{
@ -244,7 +245,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -256,7 +257,7 @@ func TestSessionEndpoint_ApplyTimers(t *testing.T) {
// Destroy the session
arg.Op = structs.SessionDestroy
arg.Session.ID = out
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -270,10 +271,10 @@ func TestSessionEndpoint_Renew(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
TTL := "10s" // the minimum allowed ttl
ttl := 10 * time.Second
@ -289,7 +290,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
ids = append(ids, out)
@ -305,7 +306,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
}
var sessions structs.IndexedSessions
if err := client.Call("Session.List", &getR, &sessions); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
@ -339,7 +340,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
Session: ids[i],
}
var session structs.IndexedSessions
if err := client.Call("Session.Renew", &renewR, &session); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Renew", &renewR, &session); err != nil {
t.Fatalf("err: %v", err)
}
@ -366,7 +367,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
time.Sleep((ttl * structs.SessionTTLMultiplier) * 2.0 / 3.0)
var sessionsL1 structs.IndexedSessions
if err := client.Call("Session.List", &getR, &sessionsL1); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessionsL1); err != nil {
t.Fatalf("err: %v", err)
}
@ -400,7 +401,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
time.Sleep(ttl * structs.SessionTTLMultiplier)
var sessionsL2 structs.IndexedSessions
if err := client.Call("Session.List", &getR, &sessionsL2); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.List", &getR, &sessionsL2); err != nil {
t.Fatalf("err: %v", err)
}
@ -430,10 +431,10 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, structs.Node{"bar", "127.0.0.1"})
@ -450,7 +451,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
arg.Session.Node = "foo"
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if i < 5 {
@ -463,7 +464,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
Node: "foo",
}
var sessions structs.IndexedSessions
if err := client.Call("Session.NodeSessions", &getR, &sessions); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.NodeSessions", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
@ -488,10 +489,10 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.SessionRequest{
Datacenter: "dc1",
@ -506,7 +507,7 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
arg.Session.TTL = "10z"
var out string
err := client.Call("Session.Apply", &arg, &out)
err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out)
if err == nil {
t.Fatal("expected error")
}
@ -517,7 +518,7 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
// less than SessionTTLMin
arg.Session.TTL = "5s"
err = client.Call("Session.Apply", &arg, &out)
err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out)
if err == nil {
t.Fatal("expected error")
}
@ -528,7 +529,7 @@ func TestSessionEndpoint_Apply_BadTTL(t *testing.T) {
// more than SessionTTLMax
arg.Session.TTL = "4000s"
err = client.Call("Session.Apply", &arg, &out)
err = msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &out)
if err == nil {
t.Fatal("expected error")
}

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestInitializeSessionTimers(t *testing.T) {
@ -310,8 +311,8 @@ func TestServer_SessionTTL_Failover(t *testing.T) {
t.Fatalf("Should have a leader")
}
client := rpcClient(t, leader)
defer client.Close()
codec := rpcClient(t, leader)
defer codec.Close()
// Register a node
node := structs.RegisterRequest{
@ -334,7 +335,7 @@ func TestServer_SessionTTL_Failover(t *testing.T) {
},
}
var id1 string
if err := client.Call("Session.Apply", &arg, &id1); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Session.Apply", &arg, &id1); err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -1,47 +1,47 @@
package consul
import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-msgpack/codec"
"net"
"net/rpc"
"os"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func rpcClient(t *testing.T, s *Server) *rpc.Client {
func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
addr := s.config.RPCAddr
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
// Write the Consul RPC byte to set the mode
conn.Write([]byte{byte(rpcConsul)})
cc := codec.GoRpc.ClientCodec(conn, msgpackHandle)
return rpc.NewClientWithCodec(cc)
return msgpackrpc.NewClientCodec(conn)
}
func TestStatusLeader(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
arg := struct{}{}
var leader string
if err := client.Call("Status.Leader", arg, &leader); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Status.Leader", arg, &leader); err != nil {
t.Fatalf("err: %v", err)
}
if leader != "" {
t.Fatalf("unexpected leader: %v", leader)
}
testutil.WaitForLeader(t, client.Call, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc1")
if err := client.Call("Status.Leader", arg, &leader); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Status.Leader", arg, &leader); err != nil {
t.Fatalf("err: %v", err)
}
if leader == "" {
@ -53,12 +53,12 @@ func TestStatusPeers(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
codec := rpcClient(t, s1)
defer codec.Close()
arg := struct{}{}
var peers []string
if err := client.Call("Status.Peers", arg, &peers); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Status.Peers", arg, &peers); err != nil {
t.Fatalf("err: %v", err)
}
if len(peers) != 1 {