diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 86ac975171..4e7a7e9751 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -249,8 +249,8 @@ func TestCatalogListDatacenters_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() dir2, s2 := testServerDC(t, "dc2") defer os.RemoveAll(dir2) @@ -269,10 +269,10 @@ func TestCatalogListDatacenters_DistanceSort(t *testing.T) { if _, err := s3.JoinWAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") var out []string - if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListDatacenters", struct{}{}, &out); err != nil { t.Fatalf("err: %v", err) } @@ -508,10 +508,10 @@ func TestCatalogListNodes_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") if err := s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } @@ -542,7 +542,7 @@ func TestCatalogListNodes_DistanceSort(t *testing.T) { } var out structs.IndexedNodes testutil.WaitForResult(func() (bool, error) { - client.Call("Catalog.ListNodes", &args, &out) + msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) return len(out.Nodes) == 5, nil }, func(err error) { t.Fatalf("err: %v", err) @@ -570,7 +570,7 @@ func TestCatalogListNodes_DistanceSort(t *testing.T) { Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"}, } testutil.WaitForResult(func() (bool, error) { - client.Call("Catalog.ListNodes", &args, &out) + msgpackrpc.CallWithCodec(codec, "Catalog.ListNodes", &args, &out) return len(out.Nodes) == 5, nil }, func(err error) { t.Fatalf("err: %v", err) @@ -854,30 +854,30 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() args := structs.ServiceSpecificRequest{ Datacenter: "dc1", ServiceName: "db", } var out structs.IndexedServiceNodes - err := client.Call("Catalog.ServiceNodes", &args, &out) + err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out) if err == nil || err.Error() != "No cluster leader" { t.Fatalf("err: %v", err) } - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // Add a few nodes for the associated services. s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"}) - s1.fsm.State().EnsureService(2, "aaa", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000}) + s1.fsm.State().EnsureService(2, "aaa", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false}) s1.fsm.State().EnsureNode(3, structs.Node{"foo", "127.0.0.2"}) - s1.fsm.State().EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.2", 5000}) + s1.fsm.State().EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.2", 5000, false}) s1.fsm.State().EnsureNode(5, structs.Node{"bar", "127.0.0.3"}) - s1.fsm.State().EnsureService(6, "bar", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.3", 5000}) + s1.fsm.State().EnsureService(6, "bar", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.3", 5000, false}) s1.fsm.State().EnsureNode(7, structs.Node{"baz", "127.0.0.4"}) - s1.fsm.State().EnsureService(8, "baz", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.4", 5000}) + s1.fsm.State().EnsureService(8, "baz", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.4", 5000, false}) // Set all but one of the nodes to known coordinates. updates := []structs.Coordinate{ @@ -891,7 +891,7 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) { // Query with no given source node, should get the natural order from // the index. - if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } if len(out.ServiceNodes) != 4 { @@ -917,7 +917,7 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) { ServiceName: "db", Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"}, } - if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) } if len(out.ServiceNodes) != 4 { diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index 977fd303f2..dc3c963ad0 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/coordinate" ) @@ -52,9 +53,9 @@ func TestCoordinate_Update(t *testing.T) { } defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + testutil.WaitForLeader(t, s1.RPC, "dc1") // Send an update for the first node. arg1 := structs.CoordinateUpdateRequest{ @@ -63,7 +64,7 @@ func TestCoordinate_Update(t *testing.T) { Coord: generateRandomCoordinate(), } var out struct{} - if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { t.Fatalf("err: %v", err) } @@ -73,7 +74,7 @@ func TestCoordinate_Update(t *testing.T) { Node: "node2", Coord: generateRandomCoordinate(), } - if err := client.Call("Coordinate.Update", &arg2, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { t.Fatalf("err: %v", err) } @@ -121,7 +122,7 @@ func TestCoordinate_Update(t *testing.T) { for i := 0; i < spamLen; i++ { arg1.Node = fmt.Sprintf("bogusnode%d", i) arg1.Coord = generateRandomCoordinate() - if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -146,7 +147,7 @@ func TestCoordinate_Update(t *testing.T) { // Finally, send a coordinate with the wrong dimensionality to make sure // there are no panics, and that it gets rejected. arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec)) - err = client.Call("Coordinate.Update", &arg2, &out) + err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out) if err == nil || !strings.Contains(err.Error(), "rejected bad coordinate") { t.Fatalf("should have failed with an error, got %v", err) } @@ -157,9 +158,9 @@ func TestCoordinate_Get(t *testing.T) { defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + testutil.WaitForLeader(t, s1.RPC, "dc1") arg := structs.CoordinateUpdateRequest{ Datacenter: "dc1", @@ -170,7 +171,7 @@ func TestCoordinate_Get(t *testing.T) { // Send an initial update, waiting a little while for the batch update // to run. var out struct{} - if err := client.Call("Coordinate.Update", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg, &out); err != nil { t.Fatalf("err: %v", err) } time.Sleep(2 * s1.config.CoordinateUpdatePeriod) @@ -181,20 +182,20 @@ func TestCoordinate_Get(t *testing.T) { Node: "node1", } coord := structs.IndexedCoordinate{} - if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Get", &arg2, &coord); err != nil { t.Fatalf("err: %v", err) } verifyCoordinatesEqual(t, coord.Coord, arg.Coord) // Send another coordinate update, waiting after for the flush. arg.Coord = generateRandomCoordinate() - if err := client.Call("Coordinate.Update", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg, &out); err != nil { t.Fatalf("err: %v", err) } time.Sleep(2 * s1.config.CoordinateUpdatePeriod) // Now re-query and make sure the results are fresh. - if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Get", &arg2, &coord); err != nil { t.Fatalf("err: %v", err) } verifyCoordinatesEqual(t, coord.Coord, arg.Coord) @@ -204,17 +205,17 @@ func TestCoordinate_ListDatacenters(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") // It's super hard to force the Serfs into a known configuration of // coordinates, so the best we can do is make sure our own DC shows // up in the list with the proper coordinates. The guts of the algorithm // are extensively tested in rtt_test.go using a mock database. var out []structs.DatacenterMap - if err := client.Call("Coordinate.ListDatacenters", struct{}{}, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListDatacenters", struct{}{}, &out); err != nil { t.Fatalf("err: %v", err) } if len(out) != 1 || @@ -235,9 +236,9 @@ func TestCoordinate_ListNodes(t *testing.T) { defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + testutil.WaitForLeader(t, s1.RPC, "dc1") // Send coordinate updates for a few nodes, waiting a little while for // the batch update to run. @@ -247,7 +248,7 @@ func TestCoordinate_ListNodes(t *testing.T) { Coord: generateRandomCoordinate(), } var out struct{} - if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { t.Fatalf("err: %v", err) } @@ -256,7 +257,7 @@ func TestCoordinate_ListNodes(t *testing.T) { Node: "bar", Coord: generateRandomCoordinate(), } - if err := client.Call("Coordinate.Update", &arg2, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { t.Fatalf("err: %v", err) } @@ -265,7 +266,7 @@ func TestCoordinate_ListNodes(t *testing.T) { Node: "baz", Coord: generateRandomCoordinate(), } - if err := client.Call("Coordinate.Update", &arg3, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil { t.Fatalf("err: %v", err) } time.Sleep(2 * s1.config.CoordinateUpdatePeriod) @@ -275,7 +276,7 @@ func TestCoordinate_ListNodes(t *testing.T) { Datacenter: "dc1", } resp := structs.IndexedCoordinates{} - if err := client.Call("Coordinate.ListNodes", &arg, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { t.Fatalf("err: %v", err) } if len(resp.Coordinates) != 3 || diff --git a/consul/health_endpoint_test.go b/consul/health_endpoint_test.go index e9a5503a30..082f4cf173 100644 --- a/consul/health_endpoint_test.go +++ b/consul/health_endpoint_test.go @@ -60,10 +60,10 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") if err := s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.2"}); err != nil { t.Fatalf("err: %v", err) } @@ -89,12 +89,12 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) { } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } arg.Node = "bar" - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -108,7 +108,7 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) { Node: "foo", }, } - if err := client.Call("Health.ChecksInState", &inState, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out2); err != nil { t.Fatalf("err: %v", err) } checks := out2.HealthChecks @@ -121,7 +121,7 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) { // Now query relative to bar to make sure it shows up first. inState.Source.Node = "bar" - if err := client.Call("Health.ChecksInState", &inState, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out2); err != nil { t.Fatalf("err: %v", err) } checks = out2.HealthChecks @@ -224,10 +224,10 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") if err := s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.2"}); err != nil { t.Fatalf("err: %v", err) } @@ -258,12 +258,12 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) { } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } arg.Node = "bar" - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -277,7 +277,7 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) { Node: "foo", }, } - if err := client.Call("Health.ServiceChecks", &node, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &node, &out2); err != nil { t.Fatalf("err: %v", err) } checks := out2.HealthChecks @@ -293,7 +293,7 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) { // Now query relative to bar to make sure it shows up first. node.Source.Node = "bar" - if err := client.Call("Health.ServiceChecks", &node, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &node, &out2); err != nil { t.Fatalf("err: %v", err) } checks = out2.HealthChecks @@ -395,10 +395,10 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + codec := rpcClient(t, s1) + defer codec.Close() - testutil.WaitForLeader(t, client.Call, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc1") if err := s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.2"}); err != nil { t.Fatalf("err: %v", err) } @@ -429,12 +429,12 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) { } var out struct{} - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } arg.Node = "bar" - if err := client.Call("Catalog.Register", &arg, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -448,7 +448,7 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) { Node: "foo", }, } - if err := client.Call("Health.ServiceNodes", &req, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil { t.Fatalf("err: %v", err) } nodes := out2.Nodes @@ -464,7 +464,7 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) { // Now query relative to bar to make sure it shows up first. req.Source.Node = "bar" - if err := client.Call("Health.ServiceNodes", &req, &out2); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out2); err != nil { t.Fatalf("err: %v", err) } nodes = out2.Nodes diff --git a/consul/rtt_test.go b/consul/rtt_test.go index ed8ab5c22e..7df0a7a57a 100644 --- a/consul/rtt_test.go +++ b/consul/rtt_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/coordinate" ) @@ -84,7 +85,7 @@ func verifyCheckServiceNodeSort(t *testing.T, nodes structs.CheckServiceNodes, e // | | | | | | | | | | | // 0 1 2 3 4 5 6 7 8 9 10 (ms) // -func seedCoordinates(t *testing.T, client *rpc.Client, server *Server) { +func seedCoordinates(t *testing.T, codec rpc.ClientCodec, server *Server) { updates := []structs.CoordinateUpdateRequest{ structs.CoordinateUpdateRequest{ Datacenter: "dc1", @@ -117,7 +118,7 @@ func seedCoordinates(t *testing.T, client *rpc.Client, server *Server) { // the Raft log. for _, update := range updates { var out struct{} - if err := client.Call("Coordinate.Update", &update, &out); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &update, &out); err != nil { t.Fatalf("err: %v", err) } } @@ -129,11 +130,11 @@ func TestRtt_sortNodesByDistanceFrom(t *testing.T) { defer os.RemoveAll(dir) defer server.Shutdown() - client := rpcClient(t, server) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") - seedCoordinates(t, client, server) + codec := rpcClient(t, server) + defer codec.Close() + testutil.WaitForLeader(t, server.RPC, "dc1") + seedCoordinates(t, codec, server) nodes := structs.Nodes{ structs.Node{Node: "apple"}, structs.Node{Node: "node1"}, @@ -190,11 +191,11 @@ func TestRtt_sortNodesByDistanceFrom_Nodes(t *testing.T) { defer os.RemoveAll(dir) defer server.Shutdown() - client := rpcClient(t, server) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") - seedCoordinates(t, client, server) + codec := rpcClient(t, server) + defer codec.Close() + testutil.WaitForLeader(t, server.RPC, "dc1") + seedCoordinates(t, codec, server) nodes := structs.Nodes{ structs.Node{Node: "apple"}, structs.Node{Node: "node1"}, @@ -239,11 +240,11 @@ func TestRtt_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) { defer os.RemoveAll(dir) defer server.Shutdown() - client := rpcClient(t, server) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") - seedCoordinates(t, client, server) + codec := rpcClient(t, server) + defer codec.Close() + testutil.WaitForLeader(t, server.RPC, "dc1") + seedCoordinates(t, codec, server) nodes := structs.ServiceNodes{ structs.ServiceNode{Node: "apple"}, structs.ServiceNode{Node: "node1"}, @@ -288,11 +289,11 @@ func TestRtt_sortNodesByDistanceFrom_HealthChecks(t *testing.T) { defer os.RemoveAll(dir) defer server.Shutdown() - client := rpcClient(t, server) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") - seedCoordinates(t, client, server) + codec := rpcClient(t, server) + defer codec.Close() + testutil.WaitForLeader(t, server.RPC, "dc1") + seedCoordinates(t, codec, server) checks := structs.HealthChecks{ &structs.HealthCheck{Node: "apple"}, &structs.HealthCheck{Node: "node1"}, @@ -337,11 +338,11 @@ func TestRtt_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) { defer os.RemoveAll(dir) defer server.Shutdown() - client := rpcClient(t, server) - defer client.Close() - testutil.WaitForLeader(t, client.Call, "dc1") - seedCoordinates(t, client, server) + codec := rpcClient(t, server) + defer codec.Close() + testutil.WaitForLeader(t, server.RPC, "dc1") + seedCoordinates(t, codec, server) nodes := structs.CheckServiceNodes{ structs.CheckServiceNode{Node: structs.Node{Node: "apple"}}, structs.CheckServiceNode{Node: structs.Node{Node: "node1"}},