consul: starting work on some RPC layers

This commit is contained in:
Armon Dadgar 2013-12-11 14:04:44 -08:00
parent eb428df842
commit 3b8888bdc8
6 changed files with 180 additions and 8 deletions

View File

@ -0,0 +1,49 @@
package consul
import (
"github.com/hashicorp/consul/rpc"
)
// Catalog endpoint is used to manipulate the service catalog
type Catalog struct {
*Server
}
/*
* Register : Registers that a node provides a given service
* Deregister : Deregisters that a node provides a given service
* RemoveNode: Used to force remove a node
* ListDatacenters: List the known datacenters
* ListServices : Lists the available services
* ListNodes : Lists the available nodes
* ServiceNodes: Returns the nodes that are part of a service
* NodeServices: Returns the services that a node is registered for
*/
// Register is used register that a node is providing a given service.
// Returns true if the entry was added, false if it already exists, or
// an error is returned.
func (c *Catalog) Register(args *rpc.RegisterRequest, reply *bool) error {
if done, err := c.forward("Catalog.Register", args.Datacenter, args, reply); done {
return err
}
// Run it through raft
resp, err := c.raftApply(rpc.RegisterRequestType, args)
if err != nil {
c.logger.Printf("[ERR] Register failed: %v", err)
return err
}
// Set the response
*reply = resp.(bool)
return nil
}
// Deregister is used to remove a service registration for a given node.
// Returns true if the entry was removed, false if it doesn't exist or
// an error is returned.
func (c *Catalog) Deregister(args *rpc.DeregisterRequest, reply *bool) error {
return nil
}

View File

@ -31,10 +31,8 @@ The catalog service is used to manage service discovery and registration.
Nodes can register the services they provide, and deregister them later.
The service exposes the following methods:
* Register : Registers that a node provides a given service
* Deregister : Deregisters that a node provides a given service
* RemoveFailedNode: Used to force remove a failed node
* Register : Registers a node, and potentially a node service
* Deregister : Deregisters a node, and potentially a node service
* ListDatacenters: List the known datacenters
* ListServices : Lists the available services

View File

@ -1,6 +1,8 @@
package consul
import (
"fmt"
"github.com/hashicorp/consul/rpc"
"github.com/hashicorp/raft"
"io"
)
@ -32,11 +34,22 @@ func NewFSM() (*consulFSM, error) {
return fsm, nil
}
func (c *consulFSM) Apply([]byte) interface{} {
// TODO: Decode
func (c *consulFSM) Apply(buf []byte) interface{} {
switch rpc.MessageType(buf[0]) {
case rpc.RegisterRequestType:
return c.applyRegister(buf[1:])
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
}
// TODO: Execute
return nil
func (c *consulFSM) applyRegister(buf []byte) interface{} {
var req rpc.RegisterRequest
if err := rpc.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return true
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {

View File

@ -1,6 +1,8 @@
package consul
import (
"fmt"
"github.com/hashicorp/consul/rpc"
"github.com/ugorji/go/codec"
"net"
)
@ -77,3 +79,51 @@ func (s *Server) handleConsulConn(conn net.Conn) {
}
}
}
// forward is used to forward to a remote DC or to forward to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) forward(method, dc string, args interface{}, reply interface{}) (bool, error) {
// Handle DC forwarding
if dc != s.config.Datacenter {
err := s.forwardDC(method, dc, args, reply)
return true, err
}
// Handle leader forwarding
if !s.IsLeader() {
err := s.forwardLeader(method, args, reply)
return true, err
}
return false, nil
}
// forwardLeader is used to forward an RPC call to the leader, or fail if no leader
func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error {
leader := s.raft.Leader()
if leader == nil {
return rpc.ErrNoLeader
}
return s.connPool.RPC(leader, method, args, reply)
}
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
// TODO: Fix
return fmt.Errorf("DC forwarding not supported")
}
// raftApply is used to encode a message, run it through raft, and return
// the FSM response along with any errors
func (s *Server) raftApply(t rpc.MessageType, msg interface{}) (interface{}, error) {
buf, err := rpc.Encode(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
future := s.raft.Apply(buf, 0)
if err := future.Error(); err != nil {
return nil, err
}
return future.Response(), nil
}

View File

@ -210,6 +210,7 @@ func (s *Server) setupRPC() error {
// Register the handlers
s.rpcServer.Register(&Status{server: s})
s.rpcServer.Register(&Raft{server: s})
s.rpcServer.Register(&Catalog{s})
list, err := net.Listen("tcp", s.config.RPCAddr)
if err != nil {
@ -352,3 +353,8 @@ func (s *Server) RemoveFailedNode(node string) error {
}
return nil
}
// IsLeader checks if this server is the cluster leader
func (s *Server) IsLeader() bool {
return s.raft.State() == raft.Leader
}

56
rpc/structs.go Normal file
View File

@ -0,0 +1,56 @@
package rpc
import (
"bytes"
"fmt"
"github.com/ugorji/go/codec"
)
var (
ErrNoLeader = fmt.Errorf("No cluster leader")
)
type MessageType uint8
const (
RegisterRequestType MessageType = iota
DeregisterRequestType
)
// RegisterRequest is used for the Catalog.Register endpoint
// to register a node as providing a service. If no service
// is provided, the node is registered.
type RegisterRequest struct {
Datacenter string
Node string
Address string
ServiceName string
ServicePort int
ServiceTag string
}
// DeregisterRequest is used for the Catalog.Deregister endpoint
// to deregister a node as providing a service. If no service is
// provided the entire node is deregistered.
type DeregisterRequest struct {
Datacenter string
Node string
ServiceName string
}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle
return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
}
// Encode is used to encode a MsgPack object with type prefix
func Encode(t MessageType, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(t))
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(msg)
return buf.Bytes(), err
}