Add connect expose CLI command

This commit is contained in:
Kyle Havlovitz 2020-06-05 14:54:29 -07:00
parent fed7489a37
commit b874c8ef0c
5 changed files with 535 additions and 6 deletions

View File

@ -9,6 +9,8 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
const ConfigEntryNotFoundErr string = "Config entry not found"
// Config switches on the different CRUD operations for config entries.
func (s *HTTPServer) Config(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
@ -48,7 +50,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int
setMeta(resp, &reply.QueryMeta)
if reply.Entry == nil {
return nil, NotFoundError{Reason: fmt.Sprintf("Config entry not found for %q / %q", pathArgs[0], pathArgs[1])}
return nil, NotFoundError{Reason: fmt.Sprintf("%s for %q / %q", ConfigEntryNotFoundErr, pathArgs[0], pathArgs[1])}
}
return reply.Entry, nil

View File

@ -51,7 +51,8 @@ import (
caget "github.com/hashicorp/consul/command/connect/ca/get"
caset "github.com/hashicorp/consul/command/connect/ca/set"
"github.com/hashicorp/consul/command/connect/envoy"
"github.com/hashicorp/consul/command/connect/envoy/pipe-bootstrap"
pipebootstrap "github.com/hashicorp/consul/command/connect/envoy/pipe-bootstrap"
"github.com/hashicorp/consul/command/connect/expose"
"github.com/hashicorp/consul/command/connect/proxy"
"github.com/hashicorp/consul/command/debug"
"github.com/hashicorp/consul/command/event"
@ -169,6 +170,7 @@ func init() {
Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil })
Register("connect envoy", func(ui cli.Ui) (cli.Command, error) { return envoy.New(ui), nil })
Register("connect envoy pipe-bootstrap", func(ui cli.Ui) (cli.Command, error) { return pipebootstrap.New(ui), nil })
Register("connect expose", func(ui cli.Ui) (cli.Command, error) { return expose.New(ui), nil })
Register("debug", func(ui cli.Ui) (cli.Command, error) { return debug.New(ui, MakeShutdownCh()), nil })
Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil })
Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil })

View File

@ -0,0 +1,236 @@
package expose
import (
"flag"
"fmt"
"strconv"
"strings"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/consul/command/intention/create"
"github.com/hashicorp/consul/command/intention/finder"
"github.com/mitchellh/cli"
)
func New(ui cli.Ui) *cmd {
c := &cmd{UI: ui}
c.init()
return c
}
type cmd struct {
UI cli.Ui
flags *flag.FlagSet
http *flags.HTTPFlags
help string
// flags
ingressGateway string
service string
portRaw string
port int
protocol string
}
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.flags.StringVar(&c.ingressGateway, "ingress-gateway", "",
"The name of the ingress gateway service to use. Required.")
c.flags.StringVar(&c.service, "service", "",
"The name of destination service to expose. Required.")
c.flags.StringVar(&c.portRaw, "port", "",
"The listener port to use for the service on the Ingress gateway. Required.")
c.flags.StringVar(&c.protocol, "protocol", "tcp",
"The protocol for the service. Defaults to 'tcp'. Optional.")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
flags.Merge(c.flags, c.http.ServerFlags())
c.help = flags.Usage(help, c.flags)
}
func (c *cmd) Run(args []string) int {
if err := c.flags.Parse(args); err != nil {
if err == flag.ErrHelp {
return 0
}
c.UI.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
// Set up a client.
client, err := c.http.APIClient()
if err != nil {
c.UI.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Check for any missing or invalid flag values.
if c.service == "" {
c.UI.Error("A service name must be given via the -service flag.")
return 1
}
svc, svcNamespace, err := create.ParseIntentionTarget(c.service)
if err != nil {
c.UI.Error(fmt.Sprintf("Invalid service name: %s", err))
return 1
}
if c.ingressGateway == "" {
c.UI.Error("An ingress gateway service must be given via the -ingress-gateway flag.")
return 1
}
gateway, gatewayNamespace, err := create.ParseIntentionTarget(c.ingressGateway)
if err != nil {
c.UI.Error(fmt.Sprintf("Invalid ingress gateway name: %s", err))
return 1
}
if c.portRaw == "" {
c.UI.Error("A port must be provided via the -port flag.")
return 1
} else {
c.port, err = strconv.Atoi(c.portRaw)
if err != nil {
c.UI.Error(fmt.Sprintf("Error parsing port: %s", err))
return 1
}
}
// First get the config entry for the ingress gateway, if it exists. Don't error if it's a 404 as that
// just means we'll need to create a new config entry.
conf, _, err := client.ConfigEntries().Get(api.IngressGateway, gateway, nil)
if err != nil && !strings.Contains(err.Error(), agent.ConfigEntryNotFoundErr) {
c.UI.Error(fmt.Sprintf("Error fetching existing ingress gateway configuration: %s", err))
return 1
}
if conf == nil {
conf = &api.IngressGatewayConfigEntry{
Kind: api.IngressGateway,
Name: gateway,
Namespace: gatewayNamespace,
}
}
// Make sure the flags don't conflict with existing config.
ingressConf, ok := conf.(*api.IngressGatewayConfigEntry)
if !ok {
// This should never happen
c.UI.Error(fmt.Sprintf("Config entry is an invalid type: %T", conf))
return 1
}
listenerIdx := -1
for i, listener := range ingressConf.Listeners {
// Make sure the service isn't already exposed in this gateway
for _, service := range listener.Services {
if service.Name == svc {
c.UI.Error(fmt.Sprintf("Service %q already exposed through listener with port %d", svc, listener.Port))
goto CREATE_INTENTION
}
}
// If there's already a listener for the given port, make sure the protocol matches.
if listener.Port == c.port {
listenerIdx = i
if listener.Protocol != c.protocol {
c.UI.Error(fmt.Sprintf("Listener on port %d already configured with conflicting protocol %q", listener.Port, listener.Protocol))
return 1
}
}
}
// Add a service to the existing listener for the port if one exists, or make a new listener.
if listenerIdx >= 0 {
ingressConf.Listeners[listenerIdx].Services = append(ingressConf.Listeners[listenerIdx].Services, api.IngressService{
Name: svc,
Namespace: svcNamespace,
})
} else {
ingressConf.Listeners = append(ingressConf.Listeners, api.IngressListener{
Port: c.port,
Protocol: c.protocol,
Services: []api.IngressService{
{
Name: svc,
Namespace: svcNamespace,
},
},
})
}
// Write the updated config entry using a check-and-set, so it fails if the entry
// has been changed since we looked it up.
{
succeeded, _, err := client.ConfigEntries().CAS(ingressConf, ingressConf.GetModifyIndex(), nil)
if err != nil {
c.UI.Error(fmt.Sprintf("Error writing ingress config entry: %v", err))
return 1
}
if !succeeded {
c.UI.Error("Ingress config entry was changed while attempting to update, please try again.")
return 1
}
c.UI.Output(fmt.Sprintf("Successfully updated config entry for ingress service %q", gateway))
}
CREATE_INTENTION:
// Check for an existing intention.
ixnFinder := finder.Finder{Client: client}
existing, err := ixnFinder.Find(c.ingressGateway, c.service)
if err != nil {
c.UI.Error(fmt.Sprintf("Error looking up existing intention: %s", err))
return 1
}
if existing != nil && existing.Action == api.IntentionActionAllow {
c.UI.Error(fmt.Sprintf("Intention already exists for %q -> %q", c.ingressGateway, c.service))
return 0
}
// Add the intention between the gateway service and the destination.
ixn := &api.Intention{
SourceName: gateway,
SourceNS: gatewayNamespace,
DestinationName: svc,
DestinationNS: svcNamespace,
SourceType: api.IntentionSourceConsul,
Action: api.IntentionActionAllow,
}
if existing == nil {
_, _, err = client.Connect().IntentionCreate(ixn, nil)
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating intention: %s", err))
return 1
}
} else {
_, err = client.Connect().IntentionUpdate(ixn, nil)
if err != nil {
c.UI.Error(fmt.Sprintf("Error updating intention: %s", err))
return 1
}
}
c.UI.Output(fmt.Sprintf("Successfully set up intention for %q -> %q", c.ingressGateway, c.service))
return 0
}
func (c *cmd) Synopsis() string {
return synopsis
}
func (c *cmd) Help() string {
return c.help
}
const synopsis = "Expose a Connect-enabled service through an Ingress gateway"
const help = `
Usage: consul connect expose [options]
Exposes a Connect-enabled service through the given ingress gateway, using the
given protocol and port.
`

View File

@ -0,0 +1,289 @@
package expose
import (
"testing"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestConnectExpose(t *testing.T) {
t.Parallel()
require := require.New(t)
a := agent.NewTestAgent(t, ``)
client := a.Client()
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
{
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
"-service=foo",
"-ingress-gateway=ingress",
"-port=8888",
"-protocol=tcp",
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
}
// Make sure the config entry and intention have been created.
entry, _, err := client.ConfigEntries().Get(api.IngressGateway, "ingress", nil)
require.NoError(err)
expected := &api.IngressGatewayConfigEntry{
Kind: api.IngressGateway,
Name: "ingress",
Listeners: []api.IngressListener{
{
Port: 8888,
Protocol: "tcp",
Services: []api.IngressService{
{
Name: "foo",
},
},
},
},
}
expected.CreateIndex = entry.GetCreateIndex()
expected.ModifyIndex = entry.GetModifyIndex()
require.Equal(expected, entry)
ixns, _, err := client.Connect().Intentions(nil)
require.NoError(err)
require.Len(ixns, 1)
require.Equal("ingress", ixns[0].SourceName)
require.Equal("foo", ixns[0].DestinationName)
// Run the command again with a different port, make sure the config entry
// and intentions aren't modified.
{
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
"-service=foo",
"-ingress-gateway=ingress",
"-port=7777",
"-protocol=tcp",
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
// Make sure the config entry/intention weren't affected.
entry, _, err = client.ConfigEntries().Get(api.IngressGateway, "ingress", nil)
require.NoError(err)
require.Equal(expected, entry)
ixns, _, err = client.Connect().Intentions(nil)
require.NoError(err)
require.Len(ixns, 1)
require.Equal("ingress", ixns[0].SourceName)
require.Equal("foo", ixns[0].DestinationName)
}
// Run the command again with a conflicting protocol, should exit with an error and
// cause no changes to config entry/intentions.
{
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
"-service=bar",
"-ingress-gateway=ingress",
"-port=8888",
"-protocol=http",
}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
require.Contains(ui.ErrorWriter.String(), `conflicting protocol "tcp"`)
// Make sure the config entry/intention weren't affected.
entry, _, err = client.ConfigEntries().Get(api.IngressGateway, "ingress", nil)
require.NoError(err)
require.Equal(expected, entry)
ixns, _, err = client.Connect().Intentions(nil)
require.NoError(err)
require.Len(ixns, 1)
require.Equal("ingress", ixns[0].SourceName)
require.Equal("foo", ixns[0].DestinationName)
}
}
func TestConnectExpose_invalidFlags(t *testing.T) {
t.Parallel()
require := require.New(t)
a := agent.NewTestAgent(t, ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
t.Run("missing service", func(t *testing.T) {
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
require.Contains(ui.ErrorWriter.String(), "A service name must be given")
})
t.Run("missing gateway", func(t *testing.T) {
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
"-service=foo",
}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
require.Contains(ui.ErrorWriter.String(), "An ingress gateway service must be given")
})
t.Run("missing port", func(t *testing.T) {
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
"-service=foo",
"-ingress-gateway=ingress",
}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
require.Contains(ui.ErrorWriter.String(), "A port must be provided")
})
}
func TestConnectExpose_existingConfig(t *testing.T) {
t.Parallel()
require := require.New(t)
a := agent.NewTestAgent(t, ``)
client := a.Client()
defer a.Shutdown()
// Create some service config entries to set their protocol.
for _, service := range []string{"bar", "zoo"} {
_, _, err := client.ConfigEntries().Set(&api.ServiceConfigEntry{
Kind: "service-defaults",
Name: service,
Protocol: "http",
}, nil)
require.NoError(err)
}
// Create an existing ingress config entry with some services.
ingressConf := &api.IngressGatewayConfigEntry{
Kind: api.IngressGateway,
Name: "ingress",
Listeners: []api.IngressListener{
{
Port: 8888,
Protocol: "tcp",
Services: []api.IngressService{
{
Name: "foo",
},
},
},
{
Port: 9999,
Protocol: "http",
Services: []api.IngressService{
{
Name: "bar",
},
},
},
},
}
_, _, err := client.ConfigEntries().Set(ingressConf, nil)
require.NoError(err)
// Add a service on a new port.
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
{
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
"-service=baz",
"-ingress-gateway=ingress",
"-port=10000",
"-protocol=tcp",
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
// Make sure the ingress config was updated and existing services preserved.
entry, _, err := client.ConfigEntries().Get(api.IngressGateway, "ingress", nil)
require.NoError(err)
ingressConf.Listeners = append(ingressConf.Listeners, api.IngressListener{
Port: 10000,
Protocol: "tcp",
Services: []api.IngressService{
{
Name: "baz",
},
},
})
ingressConf.CreateIndex = entry.GetCreateIndex()
ingressConf.ModifyIndex = entry.GetModifyIndex()
require.Equal(ingressConf, entry)
}
// Add an service on a port shared with an existing listener.
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
{
ui := cli.NewMockUi()
c := New(ui)
args := []string{
"-http-addr=" + a.HTTPAddr(),
"-service=zoo",
"-ingress-gateway=ingress",
"-port=9999",
"-protocol=http",
}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
// Make sure the ingress config was updated and existing services preserved.
entry, _, err := client.ConfigEntries().Get(api.IngressGateway, "ingress", nil)
require.NoError(err)
ingressConf.Listeners[1].Services = append(ingressConf.Listeners[1].Services, api.IngressService{
Name: "zoo",
})
ingressConf.CreateIndex = entry.GetCreateIndex()
ingressConf.ModifyIndex = entry.GetModifyIndex()
require.Equal(ingressConf, entry)
}
}

View File

@ -136,10 +136,10 @@ func (c *cmd) Run(args []string) int {
return 0
}
// parseIntentionTarget parses a target of the form <namespace>/<name> and returns
// ParseIntentionTarget parses a target of the form <namespace>/<name> and returns
// the two distinct parts. In some cases the namespace may be elided and this function
// will return the empty string for the namespace then.
func parseIntentionTarget(input string) (name string, namespace string, err error) {
func ParseIntentionTarget(input string) (name string, namespace string, err error) {
// Get the index to the '/'. If it doesn't exist, we have just a name
// so just set that and return.
idx := strings.IndexByte(input, '/')
@ -171,12 +171,12 @@ func (c *cmd) ixnsFromArgs(args []string) ([]*api.Intention, error) {
return nil, fmt.Errorf("Must specify two arguments: source and destination")
}
srcName, srcNamespace, err := parseIntentionTarget(args[0])
srcName, srcNamespace, err := ParseIntentionTarget(args[0])
if err != nil {
return nil, fmt.Errorf("Invalid intention source: %v", err)
}
dstName, dstNamespace, err := parseIntentionTarget(args[1])
dstName, dstNamespace, err := ParseIntentionTarget(args[1])
if err != nil {
return nil, fmt.Errorf("Invalid intention destination: %v", err)
}