mirror of https://github.com/status-im/consul.git
Move rpc structs into sub-package
This commit is contained in:
parent
1c5a8d01b1
commit
ea925ba5e3
|
@ -1,7 +1,7 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Catalog endpoint is used to manipulate the service catalog
|
// Catalog endpoint is used to manipulate the service catalog
|
||||||
|
@ -10,12 +10,12 @@ type Catalog struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register is used register that a node is providing a given service.
|
// Register is used register that a node is providing a given service.
|
||||||
func (c *Catalog) Register(args *rpc.RegisterRequest, reply *struct{}) error {
|
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
|
||||||
if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done {
|
if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.srv.raftApply(rpc.RegisterRequestType, args)
|
_, err := c.srv.raftApply(structs.RegisterRequestType, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.srv.logger.Printf("[ERR] Register failed: %v", err)
|
c.srv.logger.Printf("[ERR] Register failed: %v", err)
|
||||||
return err
|
return err
|
||||||
|
@ -24,12 +24,12 @@ func (c *Catalog) Register(args *rpc.RegisterRequest, reply *struct{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deregister is used to remove a service registration for a given node.
|
// Deregister is used to remove a service registration for a given node.
|
||||||
func (c *Catalog) Deregister(args *rpc.DeregisterRequest, reply *struct{}) error {
|
func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error {
|
||||||
if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done {
|
if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.srv.raftApply(rpc.DeregisterRequestType, args)
|
_, err := c.srv.raftApply(structs.DeregisterRequestType, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.srv.logger.Printf("[ERR] Deregister failed: %v", err)
|
c.srv.logger.Printf("[ERR] Deregister failed: %v", err)
|
||||||
return err
|
return err
|
||||||
|
@ -54,7 +54,7 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListNodes is used to query the nodes in a DC
|
// ListNodes is used to query the nodes in a DC
|
||||||
func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error {
|
func (c *Catalog) ListNodes(dc string, reply *structs.Nodes) error {
|
||||||
if done, err := c.srv.forward("Catalog.ListNodes", dc, dc, reply); done {
|
if done, err := c.srv.forward("Catalog.ListNodes", dc, dc, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -64,9 +64,9 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error {
|
||||||
rawNodes := state.Nodes()
|
rawNodes := state.Nodes()
|
||||||
|
|
||||||
// Format the response
|
// Format the response
|
||||||
nodes := rpc.Nodes(make([]rpc.Node, len(rawNodes)/2))
|
nodes := structs.Nodes(make([]structs.Node, len(rawNodes)/2))
|
||||||
for i := 0; i < len(rawNodes); i += 2 {
|
for i := 0; i < len(rawNodes); i += 2 {
|
||||||
nodes[i] = rpc.Node{rawNodes[i], rawNodes[i+1]}
|
nodes[i] = structs.Node{rawNodes[i], rawNodes[i+1]}
|
||||||
}
|
}
|
||||||
|
|
||||||
*reply = nodes
|
*reply = nodes
|
||||||
|
@ -74,7 +74,7 @@ func (c *Catalog) ListNodes(dc string, reply *rpc.Nodes) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListServices is used to query the services in a DC
|
// ListServices is used to query the services in a DC
|
||||||
func (c *Catalog) ListServices(dc string, reply *rpc.Services) error {
|
func (c *Catalog) ListServices(dc string, reply *structs.Services) error {
|
||||||
if done, err := c.srv.forward("Catalog.ListServices", dc, dc, reply); done {
|
if done, err := c.srv.forward("Catalog.ListServices", dc, dc, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -88,14 +88,14 @@ func (c *Catalog) ListServices(dc string, reply *rpc.Services) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceNodes returns all the nodes registered as part of a service
|
// ServiceNodes returns all the nodes registered as part of a service
|
||||||
func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.ServiceNodes) error {
|
func (c *Catalog) ServiceNodes(args *structs.ServiceNodesRequest, reply *structs.ServiceNodes) error {
|
||||||
if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done {
|
if done, err := c.srv.forward("Catalog.ServiceNodes", args.Datacenter, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the nodes
|
// Get the nodes
|
||||||
state := c.srv.fsm.State()
|
state := c.srv.fsm.State()
|
||||||
var nodes rpc.ServiceNodes
|
var nodes structs.ServiceNodes
|
||||||
if args.TagFilter {
|
if args.TagFilter {
|
||||||
nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
|
nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
|
||||||
} else {
|
} else {
|
||||||
|
@ -107,7 +107,7 @@ func (c *Catalog) ServiceNodes(args *rpc.ServiceNodesRequest, reply *rpc.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeServices returns all the services registered as part of a node
|
// NodeServices returns all the services registered as part of a node
|
||||||
func (c *Catalog) NodeServices(args *rpc.NodeServicesRequest, reply *rpc.NodeServices) error {
|
func (c *Catalog) NodeServices(args *structs.NodeServicesRequest, reply *structs.NodeServices) error {
|
||||||
if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done {
|
if done, err := c.srv.forward("Catalog.NodeServices", args.Datacenter, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,8 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
nrpc "net/rpc"
|
"net/rpc"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -17,7 +17,7 @@ func TestCatalogRegister(t *testing.T) {
|
||||||
client := rpcClient(t, s1)
|
client := rpcClient(t, s1)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
arg := rpc.RegisterRequest{
|
arg := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
|
@ -64,14 +64,14 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
// Use the follower as the client
|
// Use the follower as the client
|
||||||
var client *nrpc.Client
|
var client *rpc.Client
|
||||||
if !s1.IsLeader() {
|
if !s1.IsLeader() {
|
||||||
client = client1
|
client = client1
|
||||||
} else {
|
} else {
|
||||||
client = client2
|
client = client2
|
||||||
}
|
}
|
||||||
|
|
||||||
arg := rpc.RegisterRequest{
|
arg := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
|
@ -106,7 +106,7 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
|
||||||
// Wait for the leaders
|
// Wait for the leaders
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
arg := rpc.RegisterRequest{
|
arg := structs.RegisterRequest{
|
||||||
Datacenter: "dc2", // SHould forward through s1
|
Datacenter: "dc2", // SHould forward through s1
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
|
@ -127,7 +127,7 @@ func TestCatalogDeregister(t *testing.T) {
|
||||||
client := rpcClient(t, s1)
|
client := rpcClient(t, s1)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
arg := rpc.DeregisterRequest{
|
arg := structs.DeregisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
}
|
}
|
||||||
|
@ -191,7 +191,7 @@ func TestCatalogListNodes(t *testing.T) {
|
||||||
client := rpcClient(t, s1)
|
client := rpcClient(t, s1)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
var out rpc.Nodes
|
var out structs.Nodes
|
||||||
err := client.Call("Catalog.ListNodes", "dc1", &out)
|
err := client.Call("Catalog.ListNodes", "dc1", &out)
|
||||||
if err == nil || err.Error() != "No cluster leader" {
|
if err == nil || err.Error() != "No cluster leader" {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -225,7 +225,7 @@ func TestCatalogListServices(t *testing.T) {
|
||||||
client := rpcClient(t, s1)
|
client := rpcClient(t, s1)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
var out rpc.Services
|
var out structs.Services
|
||||||
err := client.Call("Catalog.ListServices", "dc1", &out)
|
err := client.Call("Catalog.ListServices", "dc1", &out)
|
||||||
if err == nil || err.Error() != "No cluster leader" {
|
if err == nil || err.Error() != "No cluster leader" {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -260,13 +260,13 @@ func TestCatalogListServiceNodes(t *testing.T) {
|
||||||
client := rpcClient(t, s1)
|
client := rpcClient(t, s1)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
args := rpc.ServiceNodesRequest{
|
args := structs.ServiceNodesRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
ServiceName: "db",
|
ServiceName: "db",
|
||||||
ServiceTag: "slave",
|
ServiceTag: "slave",
|
||||||
TagFilter: false,
|
TagFilter: false,
|
||||||
}
|
}
|
||||||
var out rpc.ServiceNodes
|
var out structs.ServiceNodes
|
||||||
err := client.Call("Catalog.ServiceNodes", &args, &out)
|
err := client.Call("Catalog.ServiceNodes", &args, &out)
|
||||||
if err == nil || err.Error() != "No cluster leader" {
|
if err == nil || err.Error() != "No cluster leader" {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -305,11 +305,11 @@ func TestCatalogNodeServices(t *testing.T) {
|
||||||
client := rpcClient(t, s1)
|
client := rpcClient(t, s1)
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
args := rpc.NodeServicesRequest{
|
args := structs.NodeServicesRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
}
|
}
|
||||||
var out rpc.NodeServices
|
var out structs.NodeServices
|
||||||
err := client.Call("Catalog.NodeServices", &args, &out)
|
err := client.Call("Catalog.NodeServices", &args, &out)
|
||||||
if err == nil || err.Error() != "No cluster leader" {
|
if err == nil || err.Error() != "No cluster leader" {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
|
|
@ -2,7 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/ugorji/go/codec"
|
"github.com/ugorji/go/codec"
|
||||||
"io"
|
"io"
|
||||||
|
@ -43,10 +43,10 @@ func (c *consulFSM) State() *StateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consulFSM) Apply(buf []byte) interface{} {
|
func (c *consulFSM) Apply(buf []byte) interface{} {
|
||||||
switch rpc.MessageType(buf[0]) {
|
switch structs.MessageType(buf[0]) {
|
||||||
case rpc.RegisterRequestType:
|
case structs.RegisterRequestType:
|
||||||
return c.applyRegister(buf[1:])
|
return c.applyRegister(buf[1:])
|
||||||
case rpc.DeregisterRequestType:
|
case structs.DeregisterRequestType:
|
||||||
return c.applyDeregister(buf[1:])
|
return c.applyDeregister(buf[1:])
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||||
|
@ -54,8 +54,8 @@ func (c *consulFSM) Apply(buf []byte) interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consulFSM) applyRegister(buf []byte) interface{} {
|
func (c *consulFSM) applyRegister(buf []byte) interface{} {
|
||||||
var req rpc.RegisterRequest
|
var req structs.RegisterRequest
|
||||||
if err := rpc.Decode(buf, &req); err != nil {
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,8 +70,8 @@ func (c *consulFSM) applyRegister(buf []byte) interface{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consulFSM) applyDeregister(buf []byte) interface{} {
|
func (c *consulFSM) applyDeregister(buf []byte) interface{} {
|
||||||
var req rpc.DeregisterRequest
|
var req structs.DeregisterRequest
|
||||||
if err := rpc.Decode(buf, &req); err != nil {
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,9 +122,9 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode
|
// Decode
|
||||||
switch rpc.MessageType(msgType[0]) {
|
switch structs.MessageType(msgType[0]) {
|
||||||
case rpc.RegisterRequestType:
|
case structs.RegisterRequestType:
|
||||||
var req rpc.RegisterRequest
|
var req structs.RegisterRequest
|
||||||
if err := dec.Decode(&req); err != nil {
|
if err := dec.Decode(&req); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -156,15 +156,15 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||||
encoder := codec.NewEncoder(sink, &handle)
|
encoder := codec.NewEncoder(sink, &handle)
|
||||||
|
|
||||||
// Register each node
|
// Register each node
|
||||||
var req rpc.RegisterRequest
|
var req structs.RegisterRequest
|
||||||
for i := 0; i < len(nodes); i += 2 {
|
for i := 0; i < len(nodes); i += 2 {
|
||||||
req = rpc.RegisterRequest{
|
req = structs.RegisterRequest{
|
||||||
Node: nodes[i],
|
Node: nodes[i],
|
||||||
Address: nodes[i+1],
|
Address: nodes[i+1],
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register the node itself
|
// Register the node itself
|
||||||
sink.Write([]byte{byte(rpc.RegisterRequestType)})
|
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||||
if err := encoder.Encode(&req); err != nil {
|
if err := encoder.Encode(&req); err != nil {
|
||||||
sink.Cancel()
|
sink.Cancel()
|
||||||
return err
|
return err
|
||||||
|
@ -177,7 +177,7 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||||
req.ServiceTag = props.Tag
|
req.ServiceTag = props.Tag
|
||||||
req.ServicePort = props.Port
|
req.ServicePort = props.Port
|
||||||
|
|
||||||
sink.Write([]byte{byte(rpc.RegisterRequestType)})
|
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||||
if err := encoder.Encode(&req); err != nil {
|
if err := encoder.Encode(&req); err != nil {
|
||||||
sink.Cancel()
|
sink.Cancel()
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -2,7 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,12 +30,12 @@ func TestFSM_RegisterNode(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := rpc.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
}
|
}
|
||||||
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := rpc.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
|
@ -71,7 +71,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
|
||||||
ServiceTag: "master",
|
ServiceTag: "master",
|
||||||
ServicePort: 8000,
|
ServicePort: 8000,
|
||||||
}
|
}
|
||||||
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ func TestFSM_DeregisterService(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := rpc.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
|
@ -107,7 +107,7 @@ func TestFSM_DeregisterService(t *testing.T) {
|
||||||
ServiceTag: "master",
|
ServiceTag: "master",
|
||||||
ServicePort: 8000,
|
ServicePort: 8000,
|
||||||
}
|
}
|
||||||
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -117,12 +117,12 @@ func TestFSM_DeregisterService(t *testing.T) {
|
||||||
t.Fatalf("resp: %v", resp)
|
t.Fatalf("resp: %v", resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
dereg := rpc.DeregisterRequest{
|
dereg := structs.DeregisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
ServiceName: "db",
|
ServiceName: "db",
|
||||||
}
|
}
|
||||||
buf, err = rpc.Encode(rpc.DeregisterRequestType, dereg)
|
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -150,7 +150,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := rpc.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
Address: "127.0.0.1",
|
Address: "127.0.0.1",
|
||||||
|
@ -158,7 +158,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
|
||||||
ServiceTag: "master",
|
ServiceTag: "master",
|
||||||
ServicePort: 8000,
|
ServicePort: 8000,
|
||||||
}
|
}
|
||||||
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -168,11 +168,11 @@ func TestFSM_DeregisterNode(t *testing.T) {
|
||||||
t.Fatalf("resp: %v", resp)
|
t.Fatalf("resp: %v", resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
dereg := rpc.DeregisterRequest{
|
dereg := structs.DeregisterRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
}
|
}
|
||||||
buf, err = rpc.Encode(rpc.DeregisterRequestType, dereg)
|
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/ugorji/go/codec"
|
"github.com/ugorji/go/codec"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -105,7 +105,7 @@ func (s *Server) forward(method, dc string, args interface{}, reply interface{})
|
||||||
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
|
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
|
||||||
leader := s.raft.Leader()
|
leader := s.raft.Leader()
|
||||||
if leader == nil {
|
if leader == nil {
|
||||||
return rpc.ErrNoLeader
|
return structs.ErrNoLeader
|
||||||
}
|
}
|
||||||
return s.connPool.RPC(leader, method, args, reply)
|
return s.connPool.RPC(leader, method, args, reply)
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
|
||||||
servers := s.remoteConsuls[dc]
|
servers := s.remoteConsuls[dc]
|
||||||
if len(servers) == 0 {
|
if len(servers) == 0 {
|
||||||
s.remoteLock.RUnlock()
|
s.remoteLock.RUnlock()
|
||||||
return rpc.ErrNoDCPath
|
return structs.ErrNoDCPath
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select a random addr
|
// Select a random addr
|
||||||
|
@ -131,8 +131,8 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
|
||||||
|
|
||||||
// raftApply is used to encode a message, run it through raft, and return
|
// raftApply is used to encode a message, run it through raft, and return
|
||||||
// the FSM response along with any errors
|
// the FSM response along with any errors
|
||||||
func (s *Server) raftApply(t rpc.MessageType, msg interface{}) (interface{}, error) {
|
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||||
buf, err := rpc.Encode(t, msg)
|
buf, err := structs.Encode(t, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to encode request: %v", err)
|
return nil, fmt.Errorf("Failed to encode request: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,15 +4,15 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/armon/gomdb"
|
"github.com/armon/gomdb"
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dbNodes = "nodes" // Maps node -> addr
|
dbNodes = "nodes" // Maps node -> addr
|
||||||
dbServices = "services" // Maps node||serv -> rpc.NodeService
|
dbServices = "services" // Maps node||serv -> structs.NodeService
|
||||||
dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> rpc.ServiceNode
|
dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> structs.ServiceNode
|
||||||
)
|
)
|
||||||
|
|
||||||
// The StateStore is responsible for maintaining all the Consul
|
// The StateStore is responsible for maintaining all the Consul
|
||||||
|
@ -207,11 +207,11 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
|
||||||
|
|
||||||
// Update the service entry
|
// Update the service entry
|
||||||
key := []byte(fmt.Sprintf("%s||%s", name, service))
|
key := []byte(fmt.Sprintf("%s||%s", name, service))
|
||||||
nService := rpc.NodeService{
|
nService := structs.NodeService{
|
||||||
Tag: tag,
|
Tag: tag,
|
||||||
Port: port,
|
Port: port,
|
||||||
}
|
}
|
||||||
val, err := rpc.Encode(255, &nService)
|
val, err := structs.Encode(255, &nService)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Abort()
|
tx.Abort()
|
||||||
return err
|
return err
|
||||||
|
@ -232,13 +232,13 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
|
||||||
|
|
||||||
// Update the index entry
|
// Update the index entry
|
||||||
key = []byte(fmt.Sprintf("%s||%s||%s", service, tag, name))
|
key = []byte(fmt.Sprintf("%s||%s||%s", service, tag, name))
|
||||||
node := rpc.ServiceNode{
|
node := structs.ServiceNode{
|
||||||
Node: name,
|
Node: name,
|
||||||
Address: string(addr),
|
Address: string(addr),
|
||||||
ServiceTag: tag,
|
ServiceTag: tag,
|
||||||
ServicePort: port,
|
ServicePort: port,
|
||||||
}
|
}
|
||||||
val, err = rpc.Encode(255, &node)
|
val, err = structs.Encode(255, &node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Abort()
|
tx.Abort()
|
||||||
return err
|
return err
|
||||||
|
@ -252,7 +252,7 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeServices is used to return all the services of a given node
|
// NodeServices is used to return all the services of a given node
|
||||||
func (s *StateStore) NodeServices(name string) rpc.NodeServices {
|
func (s *StateStore) NodeServices(name string) structs.NodeServices {
|
||||||
tx, dbis, err := s.startTxn(true, dbServices)
|
tx, dbis, err := s.startTxn(true, dbServices)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
||||||
|
@ -262,22 +262,22 @@ func (s *StateStore) NodeServices(name string) rpc.NodeServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterNodeServices is used to filter the services to a specific node
|
// filterNodeServices is used to filter the services to a specific node
|
||||||
func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) rpc.NodeServices {
|
func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) structs.NodeServices {
|
||||||
keyPrefix := []byte(fmt.Sprintf("%s||", name))
|
keyPrefix := []byte(fmt.Sprintf("%s||", name))
|
||||||
return parseNodeServices(tx, services, keyPrefix)
|
return parseNodeServices(tx, services, keyPrefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseNodeServices is used to parse the results of a queryNodeServices
|
// parseNodeServices is used to parse the results of a queryNodeServices
|
||||||
func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) rpc.NodeServices {
|
func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) structs.NodeServices {
|
||||||
// Create the cursor
|
// Create the cursor
|
||||||
cursor, err := tx.CursorOpen(dbi)
|
cursor, err := tx.CursorOpen(dbi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
services := rpc.NodeServices(make(map[string]rpc.NodeService))
|
services := structs.NodeServices(make(map[string]structs.NodeService))
|
||||||
var service string
|
var service string
|
||||||
var entry rpc.NodeService
|
var entry structs.NodeService
|
||||||
var key, val []byte
|
var key, val []byte
|
||||||
first := true
|
first := true
|
||||||
|
|
||||||
|
@ -307,7 +307,7 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) rpc.NodeServices
|
||||||
if val[0] != 255 {
|
if val[0] != 255 {
|
||||||
panic(fmt.Errorf("Bad service value: %v", val))
|
panic(fmt.Errorf("Bad service value: %v", val))
|
||||||
}
|
}
|
||||||
if err := rpc.Decode(val[1:], &entry); err != nil {
|
if err := structs.Decode(val[1:], &entry); err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
panic(fmt.Errorf("Failed to get node services: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,7 +430,7 @@ func (s *StateStore) Services() map[string][]string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceNodes returns the nodes associated with a given service
|
// ServiceNodes returns the nodes associated with a given service
|
||||||
func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes {
|
func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes {
|
||||||
tx, dbis, err := s.startTxn(false, dbServiceIndex)
|
tx, dbis, err := s.startTxn(false, dbServiceIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
||||||
|
@ -441,7 +441,7 @@ func (s *StateStore) ServiceNodes(service string) rpc.ServiceNodes {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceTagNodes returns the nodes associated with a given service matching a tag
|
// ServiceTagNodes returns the nodes associated with a given service matching a tag
|
||||||
func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes {
|
func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes {
|
||||||
tx, dbis, err := s.startTxn(false, dbServiceIndex)
|
tx, dbis, err := s.startTxn(false, dbServiceIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
||||||
|
@ -452,14 +452,14 @@ func (s *StateStore) ServiceTagNodes(service, tag string) rpc.ServiceNodes {
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
|
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
|
||||||
func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) rpc.ServiceNodes {
|
func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) structs.ServiceNodes {
|
||||||
cursor, err := tx.CursorOpen(index)
|
cursor, err := tx.CursorOpen(index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
panic(fmt.Errorf("Failed to get node services: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodes rpc.ServiceNodes
|
var nodes structs.ServiceNodes
|
||||||
var node rpc.ServiceNode
|
var node structs.ServiceNode
|
||||||
for {
|
for {
|
||||||
key, val, err := cursor.Get(nil, mdb.NEXT)
|
key, val, err := cursor.Get(nil, mdb.NEXT)
|
||||||
if err == mdb.NotFound {
|
if err == mdb.NotFound {
|
||||||
|
@ -477,7 +477,7 @@ func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) rpc.ServiceNod
|
||||||
if val[0] != 255 {
|
if val[0] != 255 {
|
||||||
panic(fmt.Errorf("Bad service value: %v", val))
|
panic(fmt.Errorf("Bad service value: %v", val))
|
||||||
}
|
}
|
||||||
if err := rpc.Decode(val[1:], &node); err != nil {
|
if err := structs.Decode(val[1:], &node); err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
panic(fmt.Errorf("Failed to get node services: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -525,7 +525,7 @@ func (s *StateSnapshot) Nodes() []string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeServices is used to return all the services of a given node
|
// NodeServices is used to return all the services of a given node
|
||||||
func (s *StateSnapshot) NodeServices(name string) rpc.NodeServices {
|
func (s *StateSnapshot) NodeServices(name string) structs.NodeServices {
|
||||||
return filterNodeServices(s.tx, s.dbis[1], name)
|
return filterNodeServices(s.tx, s.dbis[1], name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package rpc
|
package structs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
Loading…
Reference in New Issue