command/connect/proxy: register monitor and -register flag

This commit is contained in:
Mitchell Hashimoto 2018-05-20 10:04:29 -07:00
parent 1db42050bd
commit db10240bea
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
2 changed files with 349 additions and 8 deletions

View File

@ -57,6 +57,7 @@ type cmd struct {
upstreams map[string]proxyImpl.UpstreamConfig
listen string
register bool
registerId string
// test flags
testNoStart bool // don't start the proxy, just exit 0
@ -104,6 +105,9 @@ func (c *cmd) init() {
"Self-register with the local Consul agent. Only useful with "+
"-listen.")
c.flags.StringVar(&c.registerId, "register-id", "",
"ID suffix for the service. Use this to disambiguate with other proxies.")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
flags.Merge(c.flags, c.http.ServerFlags())
@ -178,6 +182,18 @@ func (c *cmd) Run(args []string) int {
p.Close()
}()
// Register the service if we requested it
if c.register {
monitor, err := c.registerMonitor(client)
if err != nil {
c.UI.Error(fmt.Sprintf("Failed initializing registration: %s", err))
return 1
}
go monitor.Run()
defer monitor.Close()
}
c.UI.Info("")
c.UI.Output("Log data will now stream in as it occurs:\n")
logGate.Flush()
@ -243,12 +259,7 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
// Parse out our listener if we have one
var listener proxyImpl.PublicListenerConfig
if c.listen != "" {
host, portRaw, err := net.SplitHostPort(c.listen)
if err != nil {
return nil, err
}
port, err := strconv.ParseInt(portRaw, 0, 0)
host, port, err := c.listenParts()
if err != nil {
return nil, err
}
@ -259,9 +270,9 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
"knows the backend service address.")
}
c.UI.Info(fmt.Sprintf(" Public listener: %s:%d => %s", host, int(port), c.serviceAddr))
c.UI.Info(fmt.Sprintf(" Public listener: %s:%d => %s", host, port, c.serviceAddr))
listener.BindAddress = host
listener.BindPort = int(port)
listener.BindPort = port
listener.LocalServiceAddress = c.serviceAddr
} else {
c.UI.Info(fmt.Sprintf(" Public listener: Disabled"))
@ -274,6 +285,43 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
}), nil
}
// registerMonitor returns the registration monitor ready to be started.
func (c *cmd) registerMonitor(client *api.Client) (*RegisterMonitor, error) {
if c.service == "" || c.listen == "" {
return nil, fmt.Errorf("-register may only be specified with -service and -listen")
}
host, port, err := c.listenParts()
if err != nil {
return nil, err
}
m := NewRegisterMonitor()
m.Logger = c.logger
m.Client = client
m.Service = c.service
m.IDSuffix = c.registerId
m.LocalAddress = host
m.LocalPort = port
return m, nil
}
// listenParts returns the host and port parts of the -listen flag. The
// -listen flag must be non-empty prior to calling this.
func (c *cmd) listenParts() (string, int, error) {
host, portRaw, err := net.SplitHostPort(c.listen)
if err != nil {
return "", 0, err
}
port, err := strconv.ParseInt(portRaw, 0, 0)
if err != nil {
return "", 0, err
}
return host, int(port), nil
}
func (c *cmd) Synopsis() string {
return synopsis
}

View File

@ -0,0 +1,293 @@
package proxy
import (
"fmt"
"log"
"os"
"sync"
"time"
"github.com/hashicorp/consul/api"
)
const (
// RegisterReconcilePeriod is how often the monitor will attempt to
// reconcile the expected service state with the remote Consul server.
RegisterReconcilePeriod = 30 * time.Second
// RegisterTTLPeriod is the TTL setting for the health check of the
// service. The monitor will automatically pass the health check
// three times per this period to be more resilient to failures.
RegisterTTLPeriod = 30 * time.Second
)
// RegisterMonitor registers the proxy with the local Consul agent with a TTL
// health check that is kept alive.
//
// This struct should be intialized with NewRegisterMonitor instead of being
// allocated directly. Using this struct without calling NewRegisterMonitor
// will result in panics.
type RegisterMonitor struct {
// Logger is the logger for the monitor.
Logger *log.Logger
// Client is the API client to a specific Consul agent. This agent is
// where the service will be registered.
Client *api.Client
// Service is the name of the service being proxied.
Service string
// LocalAddress and LocalPort are the address and port of the proxy
// itself, NOT the service being proxied.
LocalAddress string
LocalPort int
// IDSuffix is a unique ID that is appended to the end of the service
// name. This helps the service be unique. By default the service ID
// is just the proxied service name followed by "-proxy".
IDSuffix string
// The fields below are related to timing settings. See the default
// constants for more documentation on what they set.
ReconcilePeriod time.Duration
TTLPeriod time.Duration
// lock is held while reading/writing any internal state of the monitor.
// cond is a condition variable on lock that is broadcasted for runState
// changes.
lock *sync.Mutex
cond *sync.Cond
// runState is the current state of the monitor. To read this the
// lock must be held. The condition variable cond can be waited on
// for changes to this value.
runState registerRunState
}
// registerState is the state of the RegisterMonitor.
//
// This is a basic state machine with the following transitions:
//
// * idle => running, stopped
// * running => stopping, stopped
// * stopping => stopped
// * stopped => <>
//
type registerRunState uint8
const (
registerStateIdle registerRunState = iota
registerStateRunning
registerStateStopping
registerStateStopped
)
// NewRegisterMonitor initializes a RegisterMonitor. After initialization,
// the exported fields should be configured as desired. To start the monitor,
// execute Run in a goroutine.
func NewRegisterMonitor() *RegisterMonitor {
var lock sync.Mutex
return &RegisterMonitor{
Logger: log.New(os.Stderr, "", log.LstdFlags), // default logger
ReconcilePeriod: RegisterReconcilePeriod,
TTLPeriod: RegisterTTLPeriod,
lock: &lock,
cond: sync.NewCond(&lock),
}
}
// Run should be started in a goroutine and will keep Consul updated
// in the background with the state of this proxy. If registration fails
// this will continue to retry.
func (r *RegisterMonitor) Run() {
// Grab the lock and set our state. If we're not idle, then we return
// immediately since the monitor is only allowed to run once.
r.lock.Lock()
if r.runState != registerStateIdle {
r.lock.Unlock()
return
}
r.runState = registerStateRunning
r.lock.Unlock()
// Start a goroutine that just waits for a stop request
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
r.lock.Lock()
defer r.lock.Unlock()
// We wait for anything not running, just so we're more resilient
// in the face of state machine issues. Basically any state change
// will cause us to quit.
for r.runState == registerStateRunning {
r.cond.Wait()
}
}()
// When we exit, we set the state to stopped and broadcast to any
// waiting Close functions that they can return.
defer func() {
r.lock.Lock()
r.runState = registerStateStopped
r.cond.Broadcast()
r.lock.Unlock()
}()
// Run the first registration optimistically. If this fails then its
// okay since we'll just retry shortly.
r.register()
// Create the timers for trigger events. We don't use tickers because
// we don't want the events to pile on.
reconcileTimer := time.NewTimer(r.ReconcilePeriod)
heartbeatTimer := time.NewTimer(r.TTLPeriod / 3)
for {
select {
case <-reconcileTimer.C:
r.register()
reconcileTimer.Reset(r.ReconcilePeriod)
case <-heartbeatTimer.C:
r.heartbeat()
heartbeatTimer.Reset(r.TTLPeriod / 3)
case <-stopCh:
r.Logger.Printf("[INFO] proxy: stop request received, deregistering")
r.deregister()
return
}
}
}
// register queries the Consul agent to determine if we've already registered.
// If we haven't or the registered service differs from what we're trying to
// register, then we attempt to register our service.
func (r *RegisterMonitor) register() {
catalog := r.Client.Catalog()
serviceID := r.serviceID()
serviceName := r.serviceName()
// Determine the current state of this service in Consul
var currentService *api.CatalogService
services, _, err := catalog.Service(
serviceName, "",
&api.QueryOptions{AllowStale: true})
if err == nil {
for _, service := range services {
if serviceID == service.ServiceID {
currentService = service
break
}
}
}
// If we have a matching service, then do nothing
if currentService != nil {
r.Logger.Printf("[DEBUG] proxy: service already registered, not re-registering")
return
}
// If we're here, then we're registering the service.
err = r.Client.Agent().ServiceRegister(&api.AgentServiceRegistration{
Kind: api.ServiceKindConnectProxy,
ProxyDestination: r.Service,
ID: serviceID,
Name: serviceName,
Address: r.LocalAddress,
Port: r.LocalPort,
Check: &api.AgentServiceCheck{
CheckID: r.checkID(),
Name: "proxy heartbeat",
TTL: "30s",
Notes: "Built-in proxy will heartbeat this check.",
Status: "passing",
},
})
if err != nil {
r.Logger.Printf("[WARN] proxy: Failed to register Consul service: %s", err)
return
}
r.Logger.Printf("[INFO] proxy: registered Consul service: %s", serviceID)
}
// heartbeat just pings the TTL check for our service.
func (r *RegisterMonitor) heartbeat() {
// Trigger the health check passing. We don't need to retry this
// since we do a couple tries within the TTL period.
if err := r.Client.Agent().PassTTL(r.checkID(), ""); err != nil {
r.Logger.Printf("[WARN] proxy: heartbeat failed: %s", err)
}
}
// deregister deregisters the service.
func (r *RegisterMonitor) deregister() {
// Basic retry loop, no backoff for now. But we want to retry a few
// times just in case there are basic ephemeral issues.
for i := 0; i < 3; i++ {
err := r.Client.Agent().ServiceDeregister(r.serviceID())
if err == nil {
return
}
r.Logger.Printf("[WARN] proxy: service deregister failed: %s", err)
time.Sleep(500 * time.Millisecond)
}
}
// Close stops the register goroutines and deregisters the service. Once
// Close is called, the monitor can no longer be used again. It is safe to
// call Close multiple times and concurrently.
func (r *RegisterMonitor) Close() error {
r.lock.Lock()
defer r.lock.Unlock()
for {
switch r.runState {
case registerStateIdle:
// Idle so just set it to stopped and return. We notify
// the condition variable in case others are waiting.
r.runState = registerStateStopped
r.cond.Broadcast()
return nil
case registerStateRunning:
// Set the state to stopping and broadcast to all waiters,
// since Run is sitting on cond.Wait.
r.runState = registerStateStopping
r.cond.Broadcast()
r.cond.Wait() // Wait on the stopping event
case registerStateStopping:
// Still stopping, wait...
r.cond.Wait()
case registerStateStopped:
// Stopped, target state reached
return nil
}
}
}
// serviceID returns the unique ID for this proxy service.
func (r *RegisterMonitor) serviceID() string {
id := fmt.Sprintf("%s-proxy", r.Service)
if r.IDSuffix != "" {
id += "-" + r.IDSuffix
}
return id
}
// serviceName returns the non-unique name of this proxy service.
func (r *RegisterMonitor) serviceName() string {
return fmt.Sprintf("%s-proxy", r.Service)
}
// checkID is the unique ID for the registered health check.
func (r *RegisterMonitor) checkID() string {
return fmt.Sprintf("%s-ttl", r.serviceID())
}