mirror of https://github.com/status-im/consul.git
545 lines
12 KiB
Go
545 lines
12 KiB
Go
|
package agent
|
||
|
|
||
|
/*
|
||
|
The agent exposes an RPC mechanism that is used for both controlling
|
||
|
Consul as well as providing a fast streaming mechanism for events. This
|
||
|
allows other applications to easily leverage Consul without embedding.
|
||
|
|
||
|
We additionally make use of the RPC layer to also handle calls from
|
||
|
the CLI to unify the code paths. This results in a split Request/Response
|
||
|
as well as streaming mode of operation.
|
||
|
|
||
|
The system is fairly simple, each client opens a TCP connection to the
|
||
|
agent. The connection is initialized with a handshake which establishes
|
||
|
the protocol version being used. This is to allow for future changes to
|
||
|
the protocol.
|
||
|
|
||
|
Once initialized, clients send commands and wait for responses. Certain
|
||
|
commands will cause the client to subscribe to events, and those will be
|
||
|
pushed down the socket as they are received. This provides a low-latency
|
||
|
mechanism for applications to send and receive events, while also providing
|
||
|
a flexible control mechanism for Consul.
|
||
|
*/
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"fmt"
|
||
|
"github.com/hashicorp/logutils"
|
||
|
"github.com/hashicorp/serf/serf"
|
||
|
"github.com/ugorji/go/codec"
|
||
|
"io"
|
||
|
"log"
|
||
|
"net"
|
||
|
"os"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
MinRPCVersion = 1
|
||
|
MaxRPCVersion = 1
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
handshakeCommand = "handshake"
|
||
|
forceLeaveCommand = "force-leave"
|
||
|
joinCommand = "join"
|
||
|
membersLANCommand = "members-lan"
|
||
|
membersWANCommand = "members-wan"
|
||
|
stopCommand = "stop"
|
||
|
monitorCommand = "monitor"
|
||
|
leaveCommand = "leave"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
unsupportedCommand = "Unsupported command"
|
||
|
unsupportedRPCVersion = "Unsupported RPC version"
|
||
|
duplicateHandshake = "Handshake already performed"
|
||
|
handshakeRequired = "Handshake required"
|
||
|
monitorExists = "Monitor already exists"
|
||
|
)
|
||
|
|
||
|
// Request header is sent before each request
|
||
|
type requestHeader struct {
|
||
|
Command string
|
||
|
Seq uint64
|
||
|
}
|
||
|
|
||
|
// Response header is sent before each response
|
||
|
type responseHeader struct {
|
||
|
Seq uint64
|
||
|
Error string
|
||
|
}
|
||
|
|
||
|
type handshakeRequest struct {
|
||
|
Version int32
|
||
|
}
|
||
|
|
||
|
type eventRequest struct {
|
||
|
Name string
|
||
|
Payload []byte
|
||
|
Coalesce bool
|
||
|
}
|
||
|
|
||
|
type forceLeaveRequest struct {
|
||
|
Node string
|
||
|
}
|
||
|
|
||
|
type joinRequest struct {
|
||
|
Existing []string
|
||
|
WAN bool
|
||
|
}
|
||
|
|
||
|
type joinResponse struct {
|
||
|
Num int32
|
||
|
}
|
||
|
|
||
|
type membersResponse struct {
|
||
|
Members []Member
|
||
|
}
|
||
|
|
||
|
type monitorRequest struct {
|
||
|
LogLevel string
|
||
|
}
|
||
|
|
||
|
type streamRequest struct {
|
||
|
Type string
|
||
|
}
|
||
|
|
||
|
type stopRequest struct {
|
||
|
Stop uint64
|
||
|
}
|
||
|
|
||
|
type logRecord struct {
|
||
|
Log string
|
||
|
}
|
||
|
|
||
|
type userEventRecord struct {
|
||
|
Event string
|
||
|
LTime serf.LamportTime
|
||
|
Name string
|
||
|
Payload []byte
|
||
|
Coalesce bool
|
||
|
}
|
||
|
|
||
|
type Member struct {
|
||
|
Name string
|
||
|
Addr net.IP
|
||
|
Port uint16
|
||
|
Role string
|
||
|
Status string
|
||
|
ProtocolMin uint8
|
||
|
ProtocolMax uint8
|
||
|
ProtocolCur uint8
|
||
|
DelegateMin uint8
|
||
|
DelegateMax uint8
|
||
|
DelegateCur uint8
|
||
|
}
|
||
|
|
||
|
type memberEventRecord struct {
|
||
|
Event string
|
||
|
Members []Member
|
||
|
}
|
||
|
|
||
|
type AgentRPC struct {
|
||
|
sync.Mutex
|
||
|
agent *Agent
|
||
|
clients map[string]*rpcClient
|
||
|
listener net.Listener
|
||
|
logger *log.Logger
|
||
|
logWriter *logWriter
|
||
|
stop bool
|
||
|
stopCh chan struct{}
|
||
|
}
|
||
|
|
||
|
type rpcClient struct {
|
||
|
name string
|
||
|
conn net.Conn
|
||
|
reader *bufio.Reader
|
||
|
writer *bufio.Writer
|
||
|
dec *codec.Decoder
|
||
|
enc *codec.Encoder
|
||
|
writeLock sync.Mutex
|
||
|
version int32 // From the handshake, 0 before
|
||
|
logStreamer *logStream
|
||
|
}
|
||
|
|
||
|
// send is used to send an object using the MsgPack encoding. send
|
||
|
// is serialized to prevent write overlaps, while properly buffering.
|
||
|
func (c *rpcClient) Send(header *responseHeader, obj interface{}) error {
|
||
|
c.writeLock.Lock()
|
||
|
defer c.writeLock.Unlock()
|
||
|
|
||
|
if err := c.enc.Encode(header); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if obj != nil {
|
||
|
if err := c.enc.Encode(obj); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := c.writer.Flush(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *rpcClient) String() string {
|
||
|
return fmt.Sprintf("rpc.client: %v", c.conn)
|
||
|
}
|
||
|
|
||
|
// NewAgentRPC is used to create a new Agent RPC handler
|
||
|
func NewAgentRPC(agent *Agent, listener net.Listener,
|
||
|
logOutput io.Writer, logWriter *logWriter) *AgentRPC {
|
||
|
if logOutput == nil {
|
||
|
logOutput = os.Stderr
|
||
|
}
|
||
|
rpc := &AgentRPC{
|
||
|
agent: agent,
|
||
|
clients: make(map[string]*rpcClient),
|
||
|
listener: listener,
|
||
|
logger: log.New(logOutput, "", log.LstdFlags),
|
||
|
logWriter: logWriter,
|
||
|
stopCh: make(chan struct{}),
|
||
|
}
|
||
|
go rpc.listen()
|
||
|
return rpc
|
||
|
}
|
||
|
|
||
|
// Shutdown is used to shutdown the RPC layer
|
||
|
func (i *AgentRPC) Shutdown() {
|
||
|
i.Lock()
|
||
|
defer i.Unlock()
|
||
|
|
||
|
if i.stop {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
i.stop = true
|
||
|
close(i.stopCh)
|
||
|
i.listener.Close()
|
||
|
|
||
|
// Close the existing connections
|
||
|
for _, client := range i.clients {
|
||
|
client.conn.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// listen is a long running routine that listens for new clients
|
||
|
func (i *AgentRPC) listen() {
|
||
|
for {
|
||
|
conn, err := i.listener.Accept()
|
||
|
if err != nil {
|
||
|
if i.stop {
|
||
|
return
|
||
|
}
|
||
|
i.logger.Printf("[ERR] agent.rpc: Failed to accept client: %v", err)
|
||
|
continue
|
||
|
}
|
||
|
i.logger.Printf("[INFO] agent.rpc: Accepted client: %v", conn.RemoteAddr())
|
||
|
|
||
|
// Wrap the connection in a client
|
||
|
client := &rpcClient{
|
||
|
name: conn.RemoteAddr().String(),
|
||
|
conn: conn,
|
||
|
reader: bufio.NewReader(conn),
|
||
|
writer: bufio.NewWriter(conn),
|
||
|
}
|
||
|
client.dec = codec.NewDecoder(client.reader,
|
||
|
&codec.MsgpackHandle{RawToString: true, WriteExt: true})
|
||
|
client.enc = codec.NewEncoder(client.writer,
|
||
|
&codec.MsgpackHandle{RawToString: true, WriteExt: true})
|
||
|
if err != nil {
|
||
|
i.logger.Printf("[ERR] agent.rpc: Failed to create decoder: %v", err)
|
||
|
conn.Close()
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// Register the client
|
||
|
i.Lock()
|
||
|
if !i.stop {
|
||
|
i.clients[client.name] = client
|
||
|
go i.handleClient(client)
|
||
|
} else {
|
||
|
conn.Close()
|
||
|
}
|
||
|
i.Unlock()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// deregisterClient is called to cleanup after a client disconnects
|
||
|
func (i *AgentRPC) deregisterClient(client *rpcClient) {
|
||
|
// Close the socket
|
||
|
client.conn.Close()
|
||
|
|
||
|
// Remove from the clients list
|
||
|
i.Lock()
|
||
|
delete(i.clients, client.name)
|
||
|
i.Unlock()
|
||
|
|
||
|
// Remove from the log writer
|
||
|
if client.logStreamer != nil {
|
||
|
i.logWriter.DeregisterHandler(client.logStreamer)
|
||
|
client.logStreamer.Stop()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// handleClient is a long running routine that handles a single client
|
||
|
func (i *AgentRPC) handleClient(client *rpcClient) {
|
||
|
defer i.deregisterClient(client)
|
||
|
var reqHeader requestHeader
|
||
|
for {
|
||
|
// Decode the header
|
||
|
if err := client.dec.Decode(&reqHeader); err != nil {
|
||
|
if err != io.EOF && !i.stop {
|
||
|
i.logger.Printf("[ERR] agent.rpc: failed to decode request header: %v", err)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Evaluate the command
|
||
|
if err := i.handleRequest(client, &reqHeader); err != nil {
|
||
|
i.logger.Printf("[ERR] agent.rpc: Failed to evaluate request: %v", err)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// handleRequest is used to evaluate a single client command
|
||
|
func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) error {
|
||
|
// Look for a command field
|
||
|
command := reqHeader.Command
|
||
|
seq := reqHeader.Seq
|
||
|
|
||
|
// Ensure the handshake is performed before other commands
|
||
|
if command != handshakeCommand && client.version == 0 {
|
||
|
respHeader := responseHeader{Seq: seq, Error: handshakeRequired}
|
||
|
client.Send(&respHeader, nil)
|
||
|
return fmt.Errorf(handshakeRequired)
|
||
|
}
|
||
|
|
||
|
// Dispatch command specific handlers
|
||
|
switch command {
|
||
|
case handshakeCommand:
|
||
|
return i.handleHandshake(client, seq)
|
||
|
|
||
|
case membersLANCommand:
|
||
|
return i.handleMembersLAN(client, seq)
|
||
|
|
||
|
case membersWANCommand:
|
||
|
return i.handleMembersWAN(client, seq)
|
||
|
|
||
|
case monitorCommand:
|
||
|
return i.handleMonitor(client, seq)
|
||
|
|
||
|
case stopCommand:
|
||
|
return i.handleStop(client, seq)
|
||
|
|
||
|
case forceLeaveCommand:
|
||
|
return i.handleForceLeave(client, seq)
|
||
|
|
||
|
case joinCommand:
|
||
|
return i.handleJoin(client, seq)
|
||
|
|
||
|
case leaveCommand:
|
||
|
return i.handleLeave(client, seq)
|
||
|
|
||
|
default:
|
||
|
respHeader := responseHeader{Seq: seq, Error: unsupportedCommand}
|
||
|
client.Send(&respHeader, nil)
|
||
|
return fmt.Errorf("command '%s' not recognized", command)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleHandshake(client *rpcClient, seq uint64) error {
|
||
|
var req handshakeRequest
|
||
|
if err := client.dec.Decode(&req); err != nil {
|
||
|
return fmt.Errorf("decode failed: %v", err)
|
||
|
}
|
||
|
|
||
|
resp := responseHeader{
|
||
|
Seq: seq,
|
||
|
Error: "",
|
||
|
}
|
||
|
|
||
|
// Check the version
|
||
|
if req.Version < MinRPCVersion || req.Version > MaxRPCVersion {
|
||
|
resp.Error = unsupportedRPCVersion
|
||
|
} else if client.version != 0 {
|
||
|
resp.Error = duplicateHandshake
|
||
|
} else {
|
||
|
client.version = req.Version
|
||
|
}
|
||
|
return client.Send(&resp, nil)
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleForceLeave(client *rpcClient, seq uint64) error {
|
||
|
var req forceLeaveRequest
|
||
|
if err := client.dec.Decode(&req); err != nil {
|
||
|
return fmt.Errorf("decode failed: %v", err)
|
||
|
}
|
||
|
|
||
|
// Attempt leave
|
||
|
err := i.agent.ForceLeave(req.Node)
|
||
|
|
||
|
// Respond
|
||
|
resp := responseHeader{
|
||
|
Seq: seq,
|
||
|
Error: errToString(err),
|
||
|
}
|
||
|
return client.Send(&resp, nil)
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleJoin(client *rpcClient, seq uint64) error {
|
||
|
var req joinRequest
|
||
|
if err := client.dec.Decode(&req); err != nil {
|
||
|
return fmt.Errorf("decode failed: %v", err)
|
||
|
}
|
||
|
|
||
|
// Attempt the join
|
||
|
var num int
|
||
|
var err error
|
||
|
if req.WAN {
|
||
|
num, err = i.agent.JoinWAN(req.Existing)
|
||
|
} else {
|
||
|
num, err = i.agent.JoinLAN(req.Existing)
|
||
|
}
|
||
|
|
||
|
// Respond
|
||
|
header := responseHeader{
|
||
|
Seq: seq,
|
||
|
Error: errToString(err),
|
||
|
}
|
||
|
resp := joinResponse{
|
||
|
Num: int32(num),
|
||
|
}
|
||
|
return client.Send(&header, &resp)
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleMembersLAN(client *rpcClient, seq uint64) error {
|
||
|
raw := i.agent.LANMembers()
|
||
|
return formatMembers(raw, client, seq)
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleMembersWAN(client *rpcClient, seq uint64) error {
|
||
|
raw := i.agent.WANMembers()
|
||
|
return formatMembers(raw, client, seq)
|
||
|
}
|
||
|
|
||
|
func formatMembers(raw []serf.Member, client *rpcClient, seq uint64) error {
|
||
|
members := make([]Member, 0, len(raw))
|
||
|
for _, m := range raw {
|
||
|
sm := Member{
|
||
|
Name: m.Name,
|
||
|
Addr: m.Addr,
|
||
|
Port: m.Port,
|
||
|
Role: m.Role,
|
||
|
Status: m.Status.String(),
|
||
|
ProtocolMin: m.ProtocolMin,
|
||
|
ProtocolMax: m.ProtocolMax,
|
||
|
ProtocolCur: m.ProtocolCur,
|
||
|
DelegateMin: m.DelegateMin,
|
||
|
DelegateMax: m.DelegateMax,
|
||
|
DelegateCur: m.DelegateCur,
|
||
|
}
|
||
|
members = append(members, sm)
|
||
|
}
|
||
|
|
||
|
header := responseHeader{
|
||
|
Seq: seq,
|
||
|
Error: "",
|
||
|
}
|
||
|
resp := membersResponse{
|
||
|
Members: members,
|
||
|
}
|
||
|
return client.Send(&header, &resp)
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleMonitor(client *rpcClient, seq uint64) error {
|
||
|
var req monitorRequest
|
||
|
if err := client.dec.Decode(&req); err != nil {
|
||
|
return fmt.Errorf("decode failed: %v", err)
|
||
|
}
|
||
|
|
||
|
resp := responseHeader{
|
||
|
Seq: seq,
|
||
|
Error: "",
|
||
|
}
|
||
|
|
||
|
// Upper case the log level
|
||
|
req.LogLevel = strings.ToUpper(req.LogLevel)
|
||
|
|
||
|
// Create a level filter
|
||
|
filter := LevelFilter()
|
||
|
filter.MinLevel = logutils.LogLevel(req.LogLevel)
|
||
|
if !ValidateLevelFilter(filter.MinLevel, filter) {
|
||
|
resp.Error = fmt.Sprintf("Unknown log level: %s", filter.MinLevel)
|
||
|
goto SEND
|
||
|
}
|
||
|
|
||
|
// Check if there is an existing monitor
|
||
|
if client.logStreamer != nil {
|
||
|
resp.Error = monitorExists
|
||
|
goto SEND
|
||
|
}
|
||
|
|
||
|
// Create a log streamer
|
||
|
client.logStreamer = newLogStream(client, filter, seq, i.logger)
|
||
|
|
||
|
// Register with the log writer. Defer so that we can respond before
|
||
|
// registration, avoids any possible race condition
|
||
|
defer i.logWriter.RegisterHandler(client.logStreamer)
|
||
|
|
||
|
SEND:
|
||
|
return client.Send(&resp, nil)
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error {
|
||
|
var req stopRequest
|
||
|
if err := client.dec.Decode(&req); err != nil {
|
||
|
return fmt.Errorf("decode failed: %v", err)
|
||
|
}
|
||
|
|
||
|
// Remove a log monitor if any
|
||
|
if client.logStreamer != nil && client.logStreamer.seq == req.Stop {
|
||
|
i.logWriter.DeregisterHandler(client.logStreamer)
|
||
|
client.logStreamer.Stop()
|
||
|
client.logStreamer = nil
|
||
|
}
|
||
|
|
||
|
// Always succeed
|
||
|
resp := responseHeader{Seq: seq, Error: ""}
|
||
|
return client.Send(&resp, nil)
|
||
|
}
|
||
|
|
||
|
func (i *AgentRPC) handleLeave(client *rpcClient, seq uint64) error {
|
||
|
i.logger.Printf("[INFO] agent.rpc: Graceful leave triggered")
|
||
|
|
||
|
// Do the leave
|
||
|
err := i.agent.Leave()
|
||
|
if err != nil {
|
||
|
i.logger.Printf("[ERR] agent.rpc: leave failed: %v", err)
|
||
|
}
|
||
|
resp := responseHeader{Seq: seq, Error: errToString(err)}
|
||
|
|
||
|
// Send and wait
|
||
|
err = client.Send(&resp, nil)
|
||
|
|
||
|
// Trigger a shutdown!
|
||
|
if err := i.agent.Shutdown(); err != nil {
|
||
|
i.logger.Printf("[ERR] agent.rpc: shutdown failed: %v", err)
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Used to convert an error to a string representation
|
||
|
func errToString(err error) string {
|
||
|
if err == nil {
|
||
|
return ""
|
||
|
}
|
||
|
return err.Error()
|
||
|
}
|