mirror of https://github.com/status-im/consul.git
Original proxy and connect.Client implementation. Working end to end.
This commit is contained in:
parent
95da20ffd7
commit
69d5efdbbd
|
@ -6,6 +6,8 @@ import (
|
|||
catlistdc "github.com/hashicorp/consul/command/catalog/list/dc"
|
||||
catlistnodes "github.com/hashicorp/consul/command/catalog/list/nodes"
|
||||
catlistsvc "github.com/hashicorp/consul/command/catalog/list/services"
|
||||
"github.com/hashicorp/consul/command/connect"
|
||||
"github.com/hashicorp/consul/command/connect/proxy"
|
||||
"github.com/hashicorp/consul/command/event"
|
||||
"github.com/hashicorp/consul/command/exec"
|
||||
"github.com/hashicorp/consul/command/forceleave"
|
||||
|
@ -58,6 +60,8 @@ func init() {
|
|||
Register("catalog datacenters", func(ui cli.Ui) (cli.Command, error) { return catlistdc.New(ui), nil })
|
||||
Register("catalog nodes", func(ui cli.Ui) (cli.Command, error) { return catlistnodes.New(ui), nil })
|
||||
Register("catalog services", func(ui cli.Ui) (cli.Command, error) { return catlistsvc.New(ui), nil })
|
||||
Register("connect", func(ui cli.Ui) (cli.Command, error) { return connect.New(), nil })
|
||||
Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil })
|
||||
Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil })
|
||||
Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil })
|
||||
Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil })
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package connect
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/command/flags"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func New() *cmd {
|
||||
return &cmd{}
|
||||
}
|
||||
|
||||
type cmd struct{}
|
||||
|
||||
func (c *cmd) Run(args []string) int {
|
||||
return cli.RunResultHelp
|
||||
}
|
||||
|
||||
func (c *cmd) Synopsis() string {
|
||||
return synopsis
|
||||
}
|
||||
|
||||
func (c *cmd) Help() string {
|
||||
return flags.Usage(help, nil)
|
||||
}
|
||||
|
||||
const synopsis = "Interact with Consul Connect"
|
||||
const help = `
|
||||
Usage: consul connect <subcommand> [options] [args]
|
||||
|
||||
This command has subcommands for interacting with Consul Connect.
|
||||
|
||||
Here are some simple examples, and more detailed examples are available
|
||||
in the subcommands or the documentation.
|
||||
|
||||
Run the built-in Connect mTLS proxy
|
||||
|
||||
$ consul connect proxy
|
||||
|
||||
For more examples, ask for subcommand help or view the documentation.
|
||||
`
|
|
@ -0,0 +1,13 @@
|
|||
package connect
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCatalogCommand_noTabs(t *testing.T) {
|
||||
t.Parallel()
|
||||
if strings.ContainsRune(New().Help(), '\t') {
|
||||
t.Fatal("help has tabs")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
// Expose pprof if configured
|
||||
_ "net/http/pprof"
|
||||
|
||||
"github.com/hashicorp/consul/command/flags"
|
||||
proxyImpl "github.com/hashicorp/consul/proxy"
|
||||
|
||||
"github.com/hashicorp/consul/logger"
|
||||
"github.com/hashicorp/logutils"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func New(ui cli.Ui, shutdownCh <-chan struct{}) *cmd {
|
||||
c := &cmd{UI: ui, shutdownCh: shutdownCh}
|
||||
c.init()
|
||||
return c
|
||||
}
|
||||
|
||||
type cmd struct {
|
||||
UI cli.Ui
|
||||
flags *flag.FlagSet
|
||||
http *flags.HTTPFlags
|
||||
help string
|
||||
|
||||
shutdownCh <-chan struct{}
|
||||
|
||||
logFilter *logutils.LevelFilter
|
||||
logOutput io.Writer
|
||||
logger *log.Logger
|
||||
|
||||
// flags
|
||||
logLevel string
|
||||
cfgFile string
|
||||
proxyID string
|
||||
pprofAddr string
|
||||
}
|
||||
|
||||
func (c *cmd) init() {
|
||||
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
|
||||
|
||||
c.flags.StringVar(&c.cfgFile, "insecure-dev-config", "",
|
||||
"If set, proxy config is read on startup from this file (in HCL or JSON"+
|
||||
"format). If a config file is given, the proxy will use that instead of "+
|
||||
"querying the local agent for it's configuration. It will not reload it "+
|
||||
"except on startup. In this mode the proxy WILL NOT authorize incoming "+
|
||||
"connections with the local agent which is totally insecure. This is "+
|
||||
"ONLY for development and testing.")
|
||||
|
||||
c.flags.StringVar(&c.proxyID, "proxy-id", "",
|
||||
"The proxy's ID on the local agent.")
|
||||
|
||||
c.flags.StringVar(&c.logLevel, "log-level", "INFO",
|
||||
"Specifies the log level.")
|
||||
|
||||
c.flags.StringVar(&c.pprofAddr, "pprof-addr", "",
|
||||
"Enable debugging via pprof. Providing a host:port (or just ':port') "+
|
||||
"enables profiling HTTP endpoints on that address.")
|
||||
|
||||
c.http = &flags.HTTPFlags{}
|
||||
flags.Merge(c.flags, c.http.ClientFlags())
|
||||
flags.Merge(c.flags, c.http.ServerFlags())
|
||||
c.help = flags.Usage(help, c.flags)
|
||||
}
|
||||
|
||||
func (c *cmd) Run(args []string) int {
|
||||
if err := c.flags.Parse(args); err != nil {
|
||||
return 1
|
||||
}
|
||||
|
||||
// Setup the log outputs
|
||||
logConfig := &logger.Config{
|
||||
LogLevel: c.logLevel,
|
||||
}
|
||||
logFilter, logGate, _, logOutput, ok := logger.Setup(logConfig, c.UI)
|
||||
if !ok {
|
||||
return 1
|
||||
}
|
||||
c.logFilter = logFilter
|
||||
c.logOutput = logOutput
|
||||
c.logger = log.New(logOutput, "", log.LstdFlags)
|
||||
|
||||
// Enable Pprof if needed
|
||||
if c.pprofAddr != "" {
|
||||
go func() {
|
||||
c.UI.Output(fmt.Sprintf("Starting pprof HTTP endpoints on "+
|
||||
"http://%s/debug/pprof", c.pprofAddr))
|
||||
log.Fatal(http.ListenAndServe(c.pprofAddr, nil))
|
||||
}()
|
||||
}
|
||||
|
||||
// Setup Consul client
|
||||
client, err := c.http.APIClient()
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
var p *proxyImpl.Proxy
|
||||
if c.cfgFile != "" {
|
||||
c.UI.Info("Configuring proxy locally from " + c.cfgFile)
|
||||
|
||||
p, err = proxyImpl.NewFromConfigFile(client, c.cfgFile, c.logger)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed configuring from file: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
} else {
|
||||
p, err = proxyImpl.New(client, c.proxyID, c.logger)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed configuring from agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
err := p.Run(ctx)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed running proxy: %s", err))
|
||||
}
|
||||
// If we exited early due to a fatal error, need to unblock the main
|
||||
// routine. But we can't close shutdownCh since it might already be closed
|
||||
// by a signal and there is no way to tell. We also can't send on it to
|
||||
// unblock main routine since it's typed as receive only. So the best thing
|
||||
// we can do is cancel the context and have the main routine select on both.
|
||||
cancel()
|
||||
}()
|
||||
|
||||
c.UI.Output("Consul Connect proxy running!")
|
||||
|
||||
c.UI.Output("Log data will now stream in as it occurs:\n")
|
||||
logGate.Flush()
|
||||
|
||||
// Wait for shutdown or context cancel (see Run() goroutine above)
|
||||
select {
|
||||
case <-c.shutdownCh:
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
}
|
||||
c.UI.Output("Consul Connect proxy shutdown")
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *cmd) Synopsis() string {
|
||||
return synopsis
|
||||
}
|
||||
|
||||
func (c *cmd) Help() string {
|
||||
return c.help
|
||||
}
|
||||
|
||||
const synopsis = "Runs a Consul Connect proxy"
|
||||
const help = `
|
||||
Usage: consul proxy [options]
|
||||
|
||||
Starts a Consul Connect proxy and runs until an interrupt is received.
|
||||
`
|
|
@ -0,0 +1 @@
|
|||
package proxy
|
|
@ -0,0 +1,43 @@
|
|||
package connect
|
||||
|
||||
import "crypto/x509"
|
||||
|
||||
// Auther is the interface that provides both Authentication and Authorization
|
||||
// for an mTLS connection. It's only method is compatible with
|
||||
// tls.Config.VerifyPeerCertificate.
|
||||
type Auther interface {
|
||||
// Auth is called during tls Connection establishment to Authenticate and
|
||||
// Authorize the presented peer. Note that verifiedChains must not be relied
|
||||
// upon as we typically have to skip Go's internal verification so the
|
||||
// implementation takes full responsibility to validating the certificate
|
||||
// against known roots. It is also up to the user of the interface to ensure
|
||||
// appropriate validation is performed for client or server end by arranging
|
||||
// for an appropriate implementation to be hooked into the tls.Config used.
|
||||
Auth(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error
|
||||
}
|
||||
|
||||
// ClientAuther is used to auth Clients connecting to a Server.
|
||||
type ClientAuther struct{}
|
||||
|
||||
// Auth implements Auther
|
||||
func (a *ClientAuther) Auth(rawCerts [][]byte,
|
||||
verifiedChains [][]*x509.Certificate) error {
|
||||
|
||||
// TODO(banks): implement path validation and AuthZ
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServerAuther is used to auth the Server identify from a connecting Client.
|
||||
type ServerAuther struct {
|
||||
// TODO(banks): We'll need a way to pass the expected service identity (name,
|
||||
// namespace, dc, cluster) here based on discovery result.
|
||||
}
|
||||
|
||||
// Auth implements Auther
|
||||
func (a *ServerAuther) Auth(rawCerts [][]byte,
|
||||
verifiedChains [][]*x509.Certificate) error {
|
||||
|
||||
// TODO(banks): implement path validation and verify URI matches the target
|
||||
// service we intended to connect to.
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,256 @@
|
|||
package connect
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
// CertStatus indicates whether the Client currently has valid certificates for
|
||||
// incoming and outgoing connections.
|
||||
type CertStatus int
|
||||
|
||||
const (
|
||||
// CertStatusUnknown is the zero value for CertStatus which may be returned
|
||||
// when a watch channel is closed on shutdown. It has no other meaning.
|
||||
CertStatusUnknown CertStatus = iota
|
||||
|
||||
// CertStatusOK indicates the client has valid certificates and trust roots to
|
||||
// Authenticate incoming and outgoing connections.
|
||||
CertStatusOK
|
||||
|
||||
// CertStatusPending indicates the client is waiting to be issued initial
|
||||
// certificates, or that it's certificates have expired and it's waiting to be
|
||||
// issued new ones. In this state all incoming and outgoing connections will
|
||||
// fail.
|
||||
CertStatusPending
|
||||
)
|
||||
|
||||
func (s CertStatus) String() string {
|
||||
switch s {
|
||||
case CertStatusOK:
|
||||
return "OK"
|
||||
case CertStatusPending:
|
||||
return "pending"
|
||||
case CertStatusUnknown:
|
||||
fallthrough
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// Client is the interface a basic client implementation must support.
|
||||
type Client interface {
|
||||
// TODO(banks): build this and test it
|
||||
// CertStatus returns the current status of the client's certificates. It can
|
||||
// be used to determine if the Client is able to service requests at the
|
||||
// current time.
|
||||
//CertStatus() CertStatus
|
||||
|
||||
// TODO(banks): build this and test it
|
||||
// WatchCertStatus returns a channel that is notified on all status changes.
|
||||
// Note that a message on the channel isn't guaranteed to be different so it's
|
||||
// value should be inspected. During Client shutdown the channel will be
|
||||
// closed returning a zero type which is equivalent to CertStatusUnknown.
|
||||
//WatchCertStatus() <-chan CertStatus
|
||||
|
||||
// ServerTLSConfig returns the *tls.Config to be used when creating a TCP
|
||||
// listener that should accept Connect connections. It is likely that at
|
||||
// startup the tlsCfg returned will not be immediately usable since
|
||||
// certificates are typically fetched from the agent asynchronously. In this
|
||||
// case it's still safe to listen with the provided config, but auth failures
|
||||
// will occur until initial certificate discovery is complete. In general at
|
||||
// any time it is possible for certificates to expire before new replacements
|
||||
// have been issued due to local network errors so the server may not actually
|
||||
// have a working certificate configuration at any time, however as soon as
|
||||
// valid certs can be issued it will automatically start working again so
|
||||
// should take no action.
|
||||
ServerTLSConfig() (*tls.Config, error)
|
||||
|
||||
// DialService opens a new connection to the named service registered in
|
||||
// Consul. It will perform service discovery to find healthy instances. If
|
||||
// there is an error during connection it is returned and the caller may call
|
||||
// again. The client implementation makes a best effort to make consecutive
|
||||
// Dials against different instances either by randomising the list and/or
|
||||
// maintaining a local memory of which instances recently failed. If the
|
||||
// context passed times out before connection is established and verified an
|
||||
// error is returned.
|
||||
DialService(ctx context.Context, namespace, name string) (net.Conn, error)
|
||||
|
||||
// DialPreparedQuery opens a new connection by executing the named Prepared
|
||||
// Query against the local Consul agent, and picking one of the returned
|
||||
// instances to connect to. It will perform service discovery with the same
|
||||
// semantics as DialService.
|
||||
DialPreparedQuery(ctx context.Context, namespace, name string) (net.Conn, error)
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Maybe also convenience wrappers for:
|
||||
- listening TLS conn with right config
|
||||
- http.ListenAndServeTLS equivalent
|
||||
|
||||
*/
|
||||
|
||||
// AgentClient is the primary implementation of a connect.Client which
|
||||
// communicates with the local Consul agent.
|
||||
type AgentClient struct {
|
||||
agent *api.Client
|
||||
tlsCfg *ReloadableTLSConfig
|
||||
}
|
||||
|
||||
// NewClient returns an AgentClient to allow consuming and providing
|
||||
// Connect-enabled network services.
|
||||
func NewClient(agent *api.Client) Client {
|
||||
// TODO(banks): hook up fetching certs from Agent and updating tlsCfg on cert
|
||||
// delivery/change. Perhaps need to make
|
||||
return &AgentClient{
|
||||
agent: agent,
|
||||
tlsCfg: NewReloadableTLSConfig(defaultTLSConfig()),
|
||||
}
|
||||
}
|
||||
|
||||
// NewInsecureDevClientWithLocalCerts returns an AgentClient that will still do
|
||||
// service discovery via the local agent but will use externally provided
|
||||
// certificates and skip authorization. This is intended just for development
|
||||
// and must not be used ever in production.
|
||||
func NewInsecureDevClientWithLocalCerts(agent *api.Client, caFile, certFile,
|
||||
keyFile string) (Client, error) {
|
||||
|
||||
cfg, err := devTLSConfigFromFiles(caFile, certFile, keyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &AgentClient{
|
||||
agent: agent,
|
||||
tlsCfg: NewReloadableTLSConfig(cfg),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ServerTLSConfig implements Client
|
||||
func (c *AgentClient) ServerTLSConfig() (*tls.Config, error) {
|
||||
return c.tlsCfg.ServerTLSConfig(), nil
|
||||
}
|
||||
|
||||
// DialService implements Client
|
||||
func (c *AgentClient) DialService(ctx context.Context, namespace,
|
||||
name string) (net.Conn, error) {
|
||||
return c.dial(ctx, "service", namespace, name)
|
||||
}
|
||||
|
||||
// DialPreparedQuery implements Client
|
||||
func (c *AgentClient) DialPreparedQuery(ctx context.Context, namespace,
|
||||
name string) (net.Conn, error) {
|
||||
return c.dial(ctx, "prepared_query", namespace, name)
|
||||
}
|
||||
|
||||
func (c *AgentClient) dial(ctx context.Context, discoveryType, namespace,
|
||||
name string) (net.Conn, error) {
|
||||
|
||||
svcs, err := c.discoverInstances(ctx, discoveryType, namespace, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
svc, err := c.pickInstance(svcs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if svc == nil {
|
||||
return nil, fmt.Errorf("no healthy services discovered")
|
||||
}
|
||||
|
||||
// OK we have a service we can dial! We need a ClientAuther that will validate
|
||||
// the connection is legit.
|
||||
|
||||
// TODO(banks): implement ClientAuther properly to actually verify connected
|
||||
// cert matches the expected service/cluster etc. based on svc.
|
||||
auther := &ClientAuther{}
|
||||
tlsConfig := c.tlsCfg.TLSConfig(auther)
|
||||
|
||||
// Resolve address TODO(banks): I expected this to happen magically in the
|
||||
// agent at registration time if I register with no explicit address but
|
||||
// apparently doesn't. This is a quick hack to make it work for now, need to
|
||||
// see if there is a better shared code path for doing this.
|
||||
addr := svc.Service.Address
|
||||
if addr == "" {
|
||||
addr = svc.Node.Address
|
||||
}
|
||||
var dialer net.Dialer
|
||||
tcpConn, err := dialer.DialContext(ctx, "tcp",
|
||||
fmt.Sprintf("%s:%d", addr, svc.Service.Port))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsConn := tls.Client(tcpConn, tlsConfig)
|
||||
err = tlsConn.Handshake()
|
||||
if err != nil {
|
||||
tlsConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tlsConn, nil
|
||||
}
|
||||
|
||||
// pickInstance returns an instance from the given list to try to connect to. It
|
||||
// may be made pluggable later, for now it just picks a random one regardless of
|
||||
// whether the list is already shuffled.
|
||||
func (c *AgentClient) pickInstance(svcs []*api.ServiceEntry) (*api.ServiceEntry, error) {
|
||||
if len(svcs) < 1 {
|
||||
return nil, nil
|
||||
}
|
||||
idx := rand.Intn(len(svcs))
|
||||
return svcs[idx], nil
|
||||
}
|
||||
|
||||
// discoverInstances returns all instances for the given discoveryType,
|
||||
// namespace and name. The returned service entries may or may not be shuffled
|
||||
func (c *AgentClient) discoverInstances(ctx context.Context, discoverType,
|
||||
namespace, name string) ([]*api.ServiceEntry, error) {
|
||||
|
||||
q := &api.QueryOptions{
|
||||
// TODO(banks): make this configurable?
|
||||
AllowStale: true,
|
||||
}
|
||||
q = q.WithContext(ctx)
|
||||
|
||||
switch discoverType {
|
||||
case "service":
|
||||
svcs, _, err := c.agent.Health().Connect(name, "", true, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return svcs, err
|
||||
|
||||
case "prepared_query":
|
||||
// TODO(banks): it's not super clear to me how this should work eventually.
|
||||
// How do we distinguise between a PreparedQuery for the actual services and
|
||||
// one that should return the connect proxies where that differs? If we
|
||||
// can't then we end up with a janky UX where user specifies a reasonable
|
||||
// prepared query but we try to connect to non-connect services and fail
|
||||
// with a confusing TLS error. Maybe just a way to filter PreparedQuery
|
||||
// results by connect-enabled would be sufficient (or even metadata to do
|
||||
// that ourselves in the response although less efficient).
|
||||
resp, _, err := c.agent.PreparedQuery().Execute(name, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Awkward, we have a slice of api.ServiceEntry here but want a slice of
|
||||
// *api.ServiceEntry for compat with Connect/Service APIs. Have to convert
|
||||
// them to keep things type-happy.
|
||||
svcs := make([]*api.ServiceEntry, len(resp.Nodes))
|
||||
for idx, se := range resp.Nodes {
|
||||
svcs[idx] = &se
|
||||
}
|
||||
return svcs, err
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported discovery type: %s", discoverType)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
package connect
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/asn1"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewInsecureDevClientWithLocalCerts(t *testing.T) {
|
||||
|
||||
agent, err := api.NewClient(api.DefaultConfig())
|
||||
require.Nil(t, err)
|
||||
|
||||
got, err := NewInsecureDevClientWithLocalCerts(agent,
|
||||
"testdata/ca1-ca-consul-internal.cert.pem",
|
||||
"testdata/ca1-svc-web.cert.pem",
|
||||
"testdata/ca1-svc-web.key.pem",
|
||||
)
|
||||
require.Nil(t, err)
|
||||
|
||||
// Sanity check correct certs were loaded
|
||||
serverCfg, err := got.ServerTLSConfig()
|
||||
require.Nil(t, err)
|
||||
caSubjects := serverCfg.RootCAs.Subjects()
|
||||
require.Len(t, caSubjects, 1)
|
||||
caSubject, err := testNameFromRawDN(caSubjects[0])
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "Consul Internal", caSubject.CommonName)
|
||||
|
||||
require.Len(t, serverCfg.Certificates, 1)
|
||||
cert, err := x509.ParseCertificate(serverCfg.Certificates[0].Certificate[0])
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "web", cert.Subject.CommonName)
|
||||
}
|
||||
|
||||
func testNameFromRawDN(raw []byte) (*pkix.Name, error) {
|
||||
var seq pkix.RDNSequence
|
||||
if _, err := asn1.Unmarshal(raw, &seq); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var name pkix.Name
|
||||
name.FillFromRDNSequence(&seq)
|
||||
return &name, nil
|
||||
}
|
||||
|
||||
func testAgent(t *testing.T) (*testutil.TestServer, *api.Client) {
|
||||
t.Helper()
|
||||
|
||||
// Make client config
|
||||
conf := api.DefaultConfig()
|
||||
|
||||
// Create server
|
||||
server, err := testutil.NewTestServerConfigT(t, nil)
|
||||
require.Nil(t, err)
|
||||
|
||||
conf.Address = server.HTTPAddr
|
||||
|
||||
// Create client
|
||||
agent, err := api.NewClient(conf)
|
||||
require.Nil(t, err)
|
||||
|
||||
return server, agent
|
||||
}
|
||||
|
||||
func testService(t *testing.T, ca, name string, client *api.Client) *httptest.Server {
|
||||
t.Helper()
|
||||
|
||||
// Run a test service to discover
|
||||
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("svc: " + name))
|
||||
}))
|
||||
server.TLS = TestTLSConfig(t, ca, name)
|
||||
server.StartTLS()
|
||||
|
||||
u, err := url.Parse(server.URL)
|
||||
require.Nil(t, err)
|
||||
|
||||
port, err := strconv.Atoi(u.Port())
|
||||
require.Nil(t, err)
|
||||
|
||||
// If client is passed, register the test service instance
|
||||
if client != nil {
|
||||
svc := &api.AgentServiceRegistration{
|
||||
// TODO(banks): we don't really have a good way to represent
|
||||
// connect-native apps yet so we have to pretend out little server is a
|
||||
// proxy for now.
|
||||
Kind: api.ServiceKindConnectProxy,
|
||||
ProxyDestination: name,
|
||||
Name: name + "-proxy",
|
||||
Address: u.Hostname(),
|
||||
Port: port,
|
||||
}
|
||||
err := client.Agent().ServiceRegister(svc)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
return server
|
||||
}
|
||||
|
||||
func TestDialService(t *testing.T) {
|
||||
consulServer, agent := testAgent(t)
|
||||
defer consulServer.Stop()
|
||||
|
||||
svc := testService(t, "ca1", "web", agent)
|
||||
defer svc.Close()
|
||||
|
||||
c, err := NewInsecureDevClientWithLocalCerts(agent,
|
||||
"testdata/ca1-ca-consul-internal.cert.pem",
|
||||
"testdata/ca1-svc-web.cert.pem",
|
||||
"testdata/ca1-svc-web.key.pem",
|
||||
)
|
||||
require.Nil(t, err)
|
||||
|
||||
conn, err := c.DialService(context.Background(), "default", "web")
|
||||
require.Nilf(t, err, "err: %s", err)
|
||||
|
||||
// Inject our conn into http.Transport
|
||||
httpClient := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialTLS: func(network, addr string) (net.Conn, error) {
|
||||
return conn, nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Don't be fooled the hostname here is ignored since we did the dialling
|
||||
// ourselves
|
||||
resp, err := httpClient.Get("https://web.connect.consul/")
|
||||
require.Nil(t, err)
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Equal(t, "svc: web", string(body))
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICIDCCAcagAwIBAgIBATAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg
|
||||
SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAaMRgwFgYD
|
||||
VQQDEw9Db25zdWwgSW50ZXJuYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAT3
|
||||
IPiDHugKYEVaSpIzBjqU5lQrmirC6N1XHyOAhF2psGGxcxezpf8Vgy5Iv6XbmeHr
|
||||
cttyzUYtUKhrFBhxkPYRo4H8MIH5MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8E
|
||||
BTADAQH/MCkGA1UdDgQiBCCrnNQy2IQS73Co9WbrPXtq/YP9SvIBOJ8iYRWTOxjC
|
||||
qTArBgNVHSMEJDAigCCrnNQy2IQS73Co9WbrPXtq/YP9SvIBOJ8iYRWTOxjCqTA/
|
||||
BgNVHREEODA2hjRzcGlmZmU6Ly8xMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1
|
||||
NTU1NTU1NTUuY29uc3VsMD0GA1UdHgEB/wQzMDGgLzAtgisxMTExMTExMS0yMjIy
|
||||
LTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqGSM49BAMCA0gAMEUC
|
||||
IQDwWL6ZuszKrZjSJwDzdhRQtj1ppezJrKaDTJx+4F/tyQIgEaQCR935ztIqZzgO
|
||||
Ka6ozcH2Ubd4j4cDC1XswVMW6zs=
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIDUDO3I7WKbLTTWkNKA4unB2RLq/RX+L+XIFssDE/AD7oAoGCCqGSM49
|
||||
AwEHoUQDQgAE9yD4gx7oCmBFWkqSMwY6lOZUK5oqwujdVx8jgIRdqbBhsXMXs6X/
|
||||
FYMuSL+l25nh63Lbcs1GLVCoaxQYcZD2EQ==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,14 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICEDCCAbagAwIBAgIBBTAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg
|
||||
SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAQMQ4wDAYD
|
||||
VQQDEwVjYWNoZTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABOWw8369v4DHJAI6
|
||||
k061hU8rxaQs87mZFQ52JfleJjRoDUuZIPLhZHMFbvbI8pDWi7YdjluNbzNNh6nu
|
||||
fAivylujgfYwgfMwDgYDVR0PAQH/BAQDAgO4MB0GA1UdJQQWMBQGCCsGAQUFBwMC
|
||||
BggrBgEFBQcDATAMBgNVHRMBAf8EAjAAMCkGA1UdDgQiBCCHhMqV2/R8meSsXtwh
|
||||
OLC9hP7WQfuvwJ6V6uwKZdEofTArBgNVHSMEJDAigCCrnNQy2IQS73Co9WbrPXtq
|
||||
/YP9SvIBOJ8iYRWTOxjCqTBcBgNVHREEVTBThlFzcGlmZmU6Ly8xMTExMTExMS0y
|
||||
MjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsL25zL2RlZmF1bHQvZGMv
|
||||
ZGMwMS9zdmMvY2FjaGUwCgYIKoZIzj0EAwIDSAAwRQIgPfekKBd/ltpVkdjnB0Hp
|
||||
cV9HPwy12tXp4suR2nspSNkCIQD1Th/hvxuBKkRYy9Bl+jgTbrFdd4fLCWPeFbaM
|
||||
sgLK7g==
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIPTSPV2cWNnO69f+vYyCg5frpoBtK6L+kZVLrGCv3TdnoAoGCCqGSM49
|
||||
AwEHoUQDQgAE5bDzfr2/gMckAjqTTrWFTyvFpCzzuZkVDnYl+V4mNGgNS5kg8uFk
|
||||
cwVu9sjykNaLth2OW41vM02Hqe58CK/KWw==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,13 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICCjCCAbCgAwIBAgIBBDAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg
|
||||
SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjANMQswCQYD
|
||||
VQQDEwJkYjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABEcTyr2l7yYWZuh++02M
|
||||
usR20QrZtHdd7goKmYrIpQ3ekmHuLLgJWgTTaIhCj8fzbryep+s8oM7EiPhRQ14l
|
||||
uSujgfMwgfAwDgYDVR0PAQH/BAQDAgO4MB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr
|
||||
BgEFBQcDATAMBgNVHRMBAf8EAjAAMCkGA1UdDgQiBCAy6jHCBBT2bii+aMJCDJ33
|
||||
bFJtR72bxDBUi5b+YWyWwDArBgNVHSMEJDAigCCrnNQy2IQS73Co9WbrPXtq/YP9
|
||||
SvIBOJ8iYRWTOxjCqTBZBgNVHREEUjBQhk5zcGlmZmU6Ly8xMTExMTExMS0yMjIy
|
||||
LTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsL25zL2RlZmF1bHQvZGMvZGMw
|
||||
MS9zdmMvZGIwCgYIKoZIzj0EAwIDSAAwRQIhALCW4cOEpuYfLJ0NGwEmYG5Fko0N
|
||||
WMccL0gEQzKUbIWrAiAIw8wkTSf1O8vTHeKdR1fCmdVoDRFRKB643PaofUzFxA==
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIMHv1pjt75IjKXzl8l4rBtEFS1pEuOM4WNgeHg5Qn1RroAoGCCqGSM49
|
||||
AwEHoUQDQgAERxPKvaXvJhZm6H77TYy6xHbRCtm0d13uCgqZisilDd6SYe4suAla
|
||||
BNNoiEKPx/NuvJ6n6zygzsSI+FFDXiW5Kw==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,13 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICDDCCAbKgAwIBAgIBAzAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg
|
||||
SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAOMQwwCgYD
|
||||
VQQDEwN3ZWIwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARF47lERGXziNBC74Kh
|
||||
U3W29/M7JO9LIUaJgK0LJbhgf0MuPxf7gX+PnxH5ImI5yfXRv0SSxeCq7377IkXP
|
||||
XS6Fo4H0MIHxMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcDAgYI
|
||||
KwYBBQUHAwEwDAYDVR0TAQH/BAIwADApBgNVHQ4EIgQg26hfNYiVwYRm7CQJvdOd
|
||||
NIOmG3t8vNwXCtktC782cf8wKwYDVR0jBCQwIoAgq5zUMtiEEu9wqPVm6z17av2D
|
||||
/UryATifImEVkzsYwqkwWgYDVR0RBFMwUYZPc3BpZmZlOi8vMTExMTExMTEtMjIy
|
||||
Mi0zMzMzLTQ0NDQtNTU1NTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2Rj
|
||||
MDEvc3ZjL3dlYjAKBggqhkjOPQQDAgNIADBFAiAzi8uBs+ApPfAZZm5eO/hhVZiv
|
||||
E8p84VKCqPeF3tFfoAIhANVkdSnp2AKU5T7SlJHmieu3DFNWCVpajlHJvf286J94
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIPOIj4BFS0fknG+uAVKZIWRpnzp7O3OKpBDgEmuml7lcoAoGCCqGSM49
|
||||
AwEHoUQDQgAEReO5RERl84jQQu+CoVN1tvfzOyTvSyFGiYCtCyW4YH9DLj8X+4F/
|
||||
j58R+SJiOcn10b9EksXgqu9++yJFz10uhQ==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,14 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICDDCCAbKgAwIBAgIBAjAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe
|
||||
Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMBAxDjAMBgNVBAMTBVZhdWx0
|
||||
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEAjGVnRy/7Q2SU4ePbKbsurRAHKYA
|
||||
CuA3r9QrowgZOr7yptF54shiobMIORpfKYkoYkhzL1lhWKI06BUJ4xuPd6OB/DCB
|
||||
+TAOBgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zApBgNVHQ4EIgQgqEc5
|
||||
ZrELD5ySxapbU+eRb+aEv1MEoCvjC0mCA1uJecMwKwYDVR0jBCQwIoAgqEc5ZrEL
|
||||
D5ySxapbU+eRb+aEv1MEoCvjC0mCA1uJecMwPwYDVR0RBDgwNoY0c3BpZmZlOi8v
|
||||
MTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1NTU1NTU1NTU1LmNvbnN1bDA9BgNV
|
||||
HR4BAf8EMzAxoC8wLYIrMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1NTU1NTU1
|
||||
NTU1LmNvbnN1bDAKBggqhkjOPQQDAgNIADBFAiEA6pBdeglhq//A7sYnYk85XL+3
|
||||
4IDrXrGN3KjC9qo3J9ICIDS9pEoTPWAWDfn1ccPafKVBrJm6KrmljcvymQ2QUDIZ
|
||||
-----END CERTIFICATE-----
|
||||
----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIKnuCctuvtyzf+M6B8jGqejG4T5o7NMRYjO2M3dZITCboAoGCCqGSM49
|
||||
AwEHoUQDQgAEAjGVnRy/7Q2SU4ePbKbsurRAHKYACuA3r9QrowgZOr7yptF54shi
|
||||
obMIORpfKYkoYkhzL1lhWKI06BUJ4xuPdw==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,13 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICBzCCAaygAwIBAgIBCDAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe
|
||||
Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMBAxDjAMBgNVBAMTBWNhY2hl
|
||||
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyB6D+Eqi/71EhUrBWlcZOV2vjS9Y
|
||||
xnUQ3jfH+QUZur7WOuGLnO7eArbAHcDbqKGyDWxlkZH04sGYOXaEW7UUd6OB9jCB
|
||||
8zAOBgNVHQ8BAf8EBAMCA7gwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMB
|
||||
MAwGA1UdEwEB/wQCMAAwKQYDVR0OBCIEIGapiHFxlbYbNKFlwdPMpKhIypvNZXo8
|
||||
k/OZLki/vurQMCsGA1UdIwQkMCKAIKhHOWaxCw+cksWqW1PnkW/mhL9TBKAr4wtJ
|
||||
ggNbiXnDMFwGA1UdEQRVMFOGUXNwaWZmZTovLzExMTExMTExLTIyMjItMzMzMy00
|
||||
NDQ0LTU1NTU1NTU1NTU1NS5jb25zdWwvbnMvZGVmYXVsdC9kYy9kYzAxL3N2Yy9j
|
||||
YWNoZTAKBggqhkjOPQQDAgNJADBGAiEA/vRLXbkigS6l89MxFk0RFE7Zo4vorv7s
|
||||
E1juCOsVJBICIQDXlpmYH9fPon6DYMyOxQttNjkuWbJgnPv7rPg+CllRyA==
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIEbQOv4odF2Tu8ZnJTJuytvOd2HOF9HxgGw5ei1pkP4moAoGCCqGSM49
|
||||
AwEHoUQDQgAEyB6D+Eqi/71EhUrBWlcZOV2vjS9YxnUQ3jfH+QUZur7WOuGLnO7e
|
||||
ArbAHcDbqKGyDWxlkZH04sGYOXaEW7UUdw==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,13 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICADCCAaagAwIBAgIBBzAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe
|
||||
Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMA0xCzAJBgNVBAMTAmRiMFkw
|
||||
EwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEFeB4DynO6IeKOE4zFLlBVFv+4HeWRvK3
|
||||
6cQ9L6v5uhLfdcYyqhT/QLbQ4R8ks1vUTTiq0XJsAGdkvkt71fiEl6OB8zCB8DAO
|
||||
BgNVHQ8BAf8EBAMCA7gwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMAwG
|
||||
A1UdEwEB/wQCMAAwKQYDVR0OBCIEIKjVz8n91cej8q6WpDNd0hwSMAE2ddY056PH
|
||||
hMfaBM6GMCsGA1UdIwQkMCKAIKhHOWaxCw+cksWqW1PnkW/mhL9TBKAr4wtJggNb
|
||||
iXnDMFkGA1UdEQRSMFCGTnNwaWZmZTovLzExMTExMTExLTIyMjItMzMzMy00NDQ0
|
||||
LTU1NTU1NTU1NTU1NS5jb25zdWwvbnMvZGVmYXVsdC9kYy9kYzAxL3N2Yy9kYjAK
|
||||
BggqhkjOPQQDAgNIADBFAiAdYkokbeZr7W32NhjcNoTMNwpz9CqJpK6Yzu4N6EJc
|
||||
pAIhALHpRM57zdiMouDOlhGPX5XKzbSl2AnBjFvbPqgFV09Z
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIHnzia+DNTFB7uYQEuWvLR2czGCuDfOTt1FfcBo1uBJioAoGCCqGSM49
|
||||
AwEHoUQDQgAEFeB4DynO6IeKOE4zFLlBVFv+4HeWRvK36cQ9L6v5uhLfdcYyqhT/
|
||||
QLbQ4R8ks1vUTTiq0XJsAGdkvkt71fiElw==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,13 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICAjCCAaigAwIBAgIBBjAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe
|
||||
Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMA4xDDAKBgNVBAMTA3dlYjBZ
|
||||
MBMGByqGSM49AgEGCCqGSM49AwEHA0IABM9XzxWFCa80uQDfJEGboUC15Yr+FwDp
|
||||
OemThalQxFpkL7gQSIgpzgGULIx+jCiu+clJ0QhbWT2dnS8vFUKq35qjgfQwgfEw
|
||||
DgYDVR0PAQH/BAQDAgO4MB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATAM
|
||||
BgNVHRMBAf8EAjAAMCkGA1UdDgQiBCCN+TKHPCOr48hxRCx4rqbWQg5QHkCSNzjZ
|
||||
qi1JGs13njArBgNVHSMEJDAigCCoRzlmsQsPnJLFqltT55Fv5oS/UwSgK+MLSYID
|
||||
W4l5wzBaBgNVHREEUzBRhk9zcGlmZmU6Ly8xMTExMTExMS0yMjIyLTMzMzMtNDQ0
|
||||
NC01NTU1NTU1NTU1NTUuY29uc3VsL25zL2RlZmF1bHQvZGMvZGMwMS9zdmMvd2Vi
|
||||
MAoGCCqGSM49BAMCA0gAMEUCIBd6gpL6E8rms5BU+cJeeyv0Rjc18edn2g3q2wLN
|
||||
r1zAAiEAv16whKwR0DeKkldGLDQIu9nCNvfDZrEWgywIBYbzLxY=
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,5 @@
|
|||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIOCMjjRexX3qHjixpRwLxggJd9yuskqUoPy8/MepafP+oAoGCCqGSM49
|
||||
AwEHoUQDQgAEz1fPFYUJrzS5AN8kQZuhQLXliv4XAOk56ZOFqVDEWmQvuBBIiCnO
|
||||
AZQsjH6MKK75yUnRCFtZPZ2dLy8VQqrfmg==
|
||||
-----END EC PRIVATE KEY-----
|
|
@ -0,0 +1,14 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICFjCCAbygAwIBAgIBAjAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg
|
||||
SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAQMQ4wDAYD
|
||||
VQQDEwVWYXVsdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABAIxlZ0cv+0NklOH
|
||||
j2ym7Lq0QBymAArgN6/UK6MIGTq+8qbReeLIYqGzCDkaXymJKGJIcy9ZYViiNOgV
|
||||
CeMbj3ejgfwwgfkwDgYDVR0PAQH/BAQDAgGGMA8GA1UdEwEB/wQFMAMBAf8wKQYD
|
||||
VR0OBCIEIKhHOWaxCw+cksWqW1PnkW/mhL9TBKAr4wtJggNbiXnDMCsGA1UdIwQk
|
||||
MCKAIKuc1DLYhBLvcKj1Zus9e2r9g/1K8gE4nyJhFZM7GMKpMD8GA1UdEQQ4MDaG
|
||||
NHNwaWZmZTovLzExMTExMTExLTIyMjItMzMzMy00NDQ0LTU1NTU1NTU1NTU1NS5j
|
||||
b25zdWwwPQYDVR0eAQH/BDMwMaAvMC2CKzExMTExMTExLTIyMjItMzMzMy00NDQ0
|
||||
LTU1NTU1NTU1NTU1NS5jb25zdWwwCgYIKoZIzj0EAwIDSAAwRQIgWWWj8/6SaY2y
|
||||
wzOtIphwZLewCuLMG6KG8uY4S7UsosgCIQDhCbT/LUKq/A21khQncBmM79ng9Gbx
|
||||
/4Zw8zbVmnZJKg==
|
||||
-----END CERTIFICATE-----
|
|
@ -0,0 +1,243 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/big"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// You can verify a given leaf with a given root using:
|
||||
//
|
||||
// $ openssl verify -verbose -CAfile ca2-ca-vault.cert.pem ca1-svc-db.cert.pem
|
||||
//
|
||||
// Note that to verify via the cross-signed intermediate, openssl requires it to
|
||||
// be bundled with the _root_ CA bundle and will ignore the cert if it's passed
|
||||
// with the subject. You can do that with:
|
||||
//
|
||||
// $ openssl verify -verbose -CAfile \
|
||||
// <(cat ca1-ca-consul-internal.cert.pem ca2-xc-by-ca1.cert.pem) \
|
||||
// ca2-svc-db.cert.pem
|
||||
// ca2-svc-db.cert.pem: OK
|
||||
//
|
||||
// Note that the same leaf and root without the intermediate should fail:
|
||||
//
|
||||
// $ openssl verify -verbose -CAfile ca1-ca-consul-internal.cert.pem ca2-svc-db.cert.pem
|
||||
// ca2-svc-db.cert.pem: CN = db
|
||||
// error 20 at 0 depth lookup:unable to get local issuer certificate
|
||||
//
|
||||
// NOTE: THIS IS A QUIRK OF OPENSSL; in Connect we will distribute the roots
|
||||
// alone and stable intermediates like the XC cert to the _leaf_.
|
||||
|
||||
var clusterID = "11111111-2222-3333-4444-555555555555"
|
||||
var cAs = []string{"Consul Internal", "Vault"}
|
||||
var services = []string{"web", "db", "cache"}
|
||||
var slugRe = regexp.MustCompile("[^a-zA-Z0-9]+")
|
||||
var serial int64
|
||||
|
||||
type caInfo struct {
|
||||
id int
|
||||
name string
|
||||
slug string
|
||||
uri *url.URL
|
||||
pk *ecdsa.PrivateKey
|
||||
cert *x509.Certificate
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Make CA certs
|
||||
caInfos := make(map[string]caInfo)
|
||||
var previousCA *caInfo
|
||||
for idx, name := range cAs {
|
||||
ca := caInfo{
|
||||
id: idx + 1,
|
||||
name: name,
|
||||
slug: strings.ToLower(slugRe.ReplaceAllString(name, "-")),
|
||||
}
|
||||
pk, err := makePK(fmt.Sprintf("ca%d-ca-%s.key.pem", ca.id, ca.slug))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
ca.pk = pk
|
||||
caURI, err := url.Parse(fmt.Sprintf("spiffe://%s.consul", clusterID))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
ca.uri = caURI
|
||||
cert, err := makeCACert(ca, previousCA)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
ca.cert = cert
|
||||
caInfos[name] = ca
|
||||
previousCA = &ca
|
||||
}
|
||||
|
||||
// For each CA, make a leaf cert for each service
|
||||
for _, ca := range caInfos {
|
||||
for _, svc := range services {
|
||||
_, err := makeLeafCert(ca, svc)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func makePK(path string) (*ecdsa.PrivateKey, error) {
|
||||
log.Printf("Writing PK file: %s", path)
|
||||
priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bs, err := x509.MarshalECPrivateKey(priv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = writePEM(path, "EC PRIVATE KEY", bs)
|
||||
return priv, nil
|
||||
}
|
||||
|
||||
func makeCACert(ca caInfo, previousCA *caInfo) (*x509.Certificate, error) {
|
||||
path := fmt.Sprintf("ca%d-ca-%s.cert.pem", ca.id, ca.slug)
|
||||
log.Printf("Writing CA cert file: %s", path)
|
||||
serial++
|
||||
subj := pkix.Name{
|
||||
CommonName: ca.name,
|
||||
}
|
||||
template := x509.Certificate{
|
||||
SerialNumber: big.NewInt(serial),
|
||||
Subject: subj,
|
||||
// New in go 1.10
|
||||
URIs: []*url.URL{ca.uri},
|
||||
// Add DNS name constraint
|
||||
PermittedDNSDomainsCritical: true,
|
||||
PermittedDNSDomains: []string{ca.uri.Hostname()},
|
||||
SignatureAlgorithm: x509.ECDSAWithSHA256,
|
||||
BasicConstraintsValid: true,
|
||||
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
|
||||
IsCA: true,
|
||||
NotAfter: time.Now().Add(10 * 365 * 24 * time.Hour),
|
||||
NotBefore: time.Now(),
|
||||
AuthorityKeyId: keyID(&ca.pk.PublicKey),
|
||||
SubjectKeyId: keyID(&ca.pk.PublicKey),
|
||||
}
|
||||
bs, err := x509.CreateCertificate(rand.Reader, &template, &template,
|
||||
&ca.pk.PublicKey, ca.pk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = writePEM(path, "CERTIFICATE", bs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cert, err := x509.ParseCertificate(bs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if previousCA != nil {
|
||||
// Also create cross-signed cert as we would use during rotation between
|
||||
// previous CA and this one.
|
||||
template.AuthorityKeyId = keyID(&previousCA.pk.PublicKey)
|
||||
bs, err := x509.CreateCertificate(rand.Reader, &template,
|
||||
previousCA.cert, &ca.pk.PublicKey, previousCA.pk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("ca%d-xc-by-ca%d.cert.pem", ca.id, previousCA.id)
|
||||
err = writePEM(path, "CERTIFICATE", bs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return cert, err
|
||||
}
|
||||
|
||||
func keyID(pub *ecdsa.PublicKey) []byte {
|
||||
// This is not standard; RFC allows any unique identifier as long as they
|
||||
// match in subject/authority chains but suggests specific hashing of DER
|
||||
// bytes of public key including DER tags. I can't be bothered to do esp.
|
||||
// since ECDSA keys don't have a handy way to marshal the publick key alone.
|
||||
h := sha256.New()
|
||||
h.Write(pub.X.Bytes())
|
||||
h.Write(pub.Y.Bytes())
|
||||
return h.Sum([]byte{})
|
||||
}
|
||||
|
||||
func makeLeafCert(ca caInfo, svc string) (*x509.Certificate, error) {
|
||||
svcURI := ca.uri
|
||||
svcURI.Path = "/ns/default/dc/dc01/svc/" + svc
|
||||
|
||||
keyPath := fmt.Sprintf("ca%d-svc-%s.key.pem", ca.id, svc)
|
||||
cPath := fmt.Sprintf("ca%d-svc-%s.cert.pem", ca.id, svc)
|
||||
|
||||
pk, err := makePK(keyPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Printf("Writing Service Cert: %s", cPath)
|
||||
|
||||
serial++
|
||||
subj := pkix.Name{
|
||||
CommonName: svc,
|
||||
}
|
||||
template := x509.Certificate{
|
||||
SerialNumber: big.NewInt(serial),
|
||||
Subject: subj,
|
||||
// New in go 1.10
|
||||
URIs: []*url.URL{svcURI},
|
||||
SignatureAlgorithm: x509.ECDSAWithSHA256,
|
||||
BasicConstraintsValid: true,
|
||||
KeyUsage: x509.KeyUsageDataEncipherment |
|
||||
x509.KeyUsageKeyAgreement | x509.KeyUsageDigitalSignature |
|
||||
x509.KeyUsageKeyEncipherment,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{
|
||||
x509.ExtKeyUsageClientAuth,
|
||||
x509.ExtKeyUsageServerAuth,
|
||||
},
|
||||
NotAfter: time.Now().Add(10 * 365 * 24 * time.Hour),
|
||||
NotBefore: time.Now(),
|
||||
AuthorityKeyId: keyID(&ca.pk.PublicKey),
|
||||
SubjectKeyId: keyID(&pk.PublicKey),
|
||||
}
|
||||
bs, err := x509.CreateCertificate(rand.Reader, &template, ca.cert,
|
||||
&pk.PublicKey, ca.pk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = writePEM(cPath, "CERTIFICATE", bs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return x509.ParseCertificate(bs)
|
||||
}
|
||||
|
||||
func writePEM(name, typ string, bs []byte) error {
|
||||
f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
return pem.Encode(f, &pem.Block{Type: typ, Bytes: bs})
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package connect
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// testDataDir is a janky temporary hack to allow use of these methods from
|
||||
// proxy package. We need to revisit where all this lives since it logically
|
||||
// overlaps with consul/agent in Mitchell's PR and that one generates certs on
|
||||
// the fly which will make this unecessary but I want to get things working for
|
||||
// now with what I've got :). This wonderful heap kinda-sorta gets the path
|
||||
// relative to _this_ file so it works even if the Test* method is being called
|
||||
// from a test binary in another package dir.
|
||||
func testDataDir() string {
|
||||
_, filename, _, ok := runtime.Caller(0)
|
||||
if !ok {
|
||||
panic("no caller information")
|
||||
}
|
||||
return path.Dir(filename) + "/testdata"
|
||||
}
|
||||
|
||||
// TestCAPool returns an *x509.CertPool containing the named CA certs from the
|
||||
// testdata dir.
|
||||
func TestCAPool(t testing.T, caNames ...string) *x509.CertPool {
|
||||
t.Helper()
|
||||
pool := x509.NewCertPool()
|
||||
for _, name := range caNames {
|
||||
certs, err := filepath.Glob(testDataDir() + "/" + name + "-ca-*.cert.pem")
|
||||
require.Nil(t, err)
|
||||
for _, cert := range certs {
|
||||
caPem, err := ioutil.ReadFile(cert)
|
||||
require.Nil(t, err)
|
||||
pool.AppendCertsFromPEM(caPem)
|
||||
}
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
// TestSvcKeyPair returns an tls.Certificate containing both cert and private
|
||||
// key for a given service under a given CA from the testdata dir.
|
||||
func TestSvcKeyPair(t testing.T, ca, name string) tls.Certificate {
|
||||
t.Helper()
|
||||
prefix := fmt.Sprintf(testDataDir()+"/%s-svc-%s", ca, name)
|
||||
cert, err := tls.LoadX509KeyPair(prefix+".cert.pem", prefix+".key.pem")
|
||||
require.Nil(t, err)
|
||||
return cert
|
||||
}
|
||||
|
||||
// TestTLSConfig returns a *tls.Config suitable for use during tests.
|
||||
func TestTLSConfig(t testing.T, ca, svc string) *tls.Config {
|
||||
t.Helper()
|
||||
return &tls.Config{
|
||||
Certificates: []tls.Certificate{TestSvcKeyPair(t, ca, svc)},
|
||||
MinVersion: tls.VersionTLS12,
|
||||
RootCAs: TestCAPool(t, ca),
|
||||
ClientCAs: TestCAPool(t, ca),
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
// In real life we'll need to do this too since otherwise Go will attempt to
|
||||
// verify DNS names match DNS SAN/CN which we don't want, but we'll hook
|
||||
// VerifyPeerCertificates and do our own x509 path validation as well as
|
||||
// AuthZ upcall. For now we are just testing the basic proxy mechanism so
|
||||
// this is fine.
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
}
|
||||
|
||||
// TestAuther is a simple Auther implementation that does nothing but what you
|
||||
// tell it to!
|
||||
type TestAuther struct {
|
||||
// Return is the value returned from an Auth() call. Set it to nil to have all
|
||||
// certificates unconditionally accepted or a value to have them fail.
|
||||
Return error
|
||||
}
|
||||
|
||||
// Auth implements Auther
|
||||
func (a *TestAuther) Auth(rawCerts [][]byte,
|
||||
verifiedChains [][]*x509.Certificate) error {
|
||||
return a.Return
|
||||
}
|
|
@ -0,0 +1,124 @@
|
|||
package connect
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// defaultTLSConfig returns the standard config for connect clients and servers.
|
||||
func defaultTLSConfig() *tls.Config {
|
||||
serverAuther := &ServerAuther{}
|
||||
return &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
// We don't have access to go internals that decide if AES hardware
|
||||
// acceleration is available in order to prefer CHA CHA if not. So let's
|
||||
// just always prefer AES for now. We can look into doing something uglier
|
||||
// later like using an external lib for AES checking if it seems important.
|
||||
// https://github.com/golang/go/blob/df91b8044dbe790c69c16058330f545be069cc1f/src/crypto/tls/common.go#L919:14
|
||||
CipherSuites: []uint16{
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
|
||||
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
|
||||
},
|
||||
// We have to set this since otherwise Go will attempt to verify DNS names
|
||||
// match DNS SAN/CN which we don't want. We hook up VerifyPeerCertificate to
|
||||
// do our own path validation as well as Connect AuthZ.
|
||||
InsecureSkipVerify: true,
|
||||
// By default auth as if we are a server. Clients need to override this with
|
||||
// an Auther that is performs correct validation of the server identity they
|
||||
// intended to connect to.
|
||||
VerifyPeerCertificate: serverAuther.Auth,
|
||||
}
|
||||
}
|
||||
|
||||
// ReloadableTLSConfig exposes a tls.Config that can have it's certificates
|
||||
// reloaded. This works by
|
||||
type ReloadableTLSConfig struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// cfg is the current config to use for new connections
|
||||
cfg *tls.Config
|
||||
}
|
||||
|
||||
// NewReloadableTLSConfig returns a reloadable config currently set to base. The
|
||||
// Auther used to verify certificates for incoming connections on a Server will
|
||||
// just be copied from the VerifyPeerCertificate passed. Clients will need to
|
||||
// pass a specific Auther instance when they call TLSConfig that is configured
|
||||
// to perform the necessary validation of the server's identity.
|
||||
func NewReloadableTLSConfig(base *tls.Config) *ReloadableTLSConfig {
|
||||
return &ReloadableTLSConfig{cfg: base}
|
||||
}
|
||||
|
||||
// ServerTLSConfig returns a *tls.Config that will dynamically load certs for
|
||||
// each inbound connection via the GetConfigForClient callback.
|
||||
func (c *ReloadableTLSConfig) ServerTLSConfig() *tls.Config {
|
||||
// Setup the basic one with current params even though we will be using
|
||||
// different config for each new conn.
|
||||
c.mu.Lock()
|
||||
base := c.cfg
|
||||
c.mu.Unlock()
|
||||
|
||||
// Dynamically fetch the current config for each new inbound connection
|
||||
base.GetConfigForClient = func(info *tls.ClientHelloInfo) (*tls.Config, error) {
|
||||
return c.TLSConfig(nil), nil
|
||||
}
|
||||
|
||||
return base
|
||||
}
|
||||
|
||||
// TLSConfig returns the current value for the config. It is safe to call from
|
||||
// any goroutine. The passed Auther is inserted into the config's
|
||||
// VerifyPeerCertificate. Passing a nil Auther will leave the default one in the
|
||||
// base config
|
||||
func (c *ReloadableTLSConfig) TLSConfig(auther Auther) *tls.Config {
|
||||
c.mu.Lock()
|
||||
cfgCopy := c.cfg
|
||||
c.mu.Unlock()
|
||||
if auther != nil {
|
||||
cfgCopy.VerifyPeerCertificate = auther.Auth
|
||||
}
|
||||
return cfgCopy
|
||||
}
|
||||
|
||||
// SetTLSConfig sets the config used for future connections. It is safe to call
|
||||
// from any goroutine.
|
||||
func (c *ReloadableTLSConfig) SetTLSConfig(cfg *tls.Config) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.cfg = cfg
|
||||
return nil
|
||||
}
|
||||
|
||||
// devTLSConfigFromFiles returns a default TLS Config but with certs and CAs
|
||||
// based on local files for dev.
|
||||
func devTLSConfigFromFiles(caFile, certFile,
|
||||
keyFile string) (*tls.Config, error) {
|
||||
|
||||
roots := x509.NewCertPool()
|
||||
|
||||
bs, err := ioutil.ReadFile(caFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
roots.AppendCertsFromPEM(bs)
|
||||
|
||||
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := defaultTLSConfig()
|
||||
|
||||
cfg.Certificates = []tls.Certificate{cert}
|
||||
cfg.RootCAs = roots
|
||||
cfg.ClientCAs = roots
|
||||
|
||||
return cfg, nil
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package connect
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReloadableTLSConfig(t *testing.T) {
|
||||
base := TestTLSConfig(t, "ca1", "web")
|
||||
|
||||
c := NewReloadableTLSConfig(base)
|
||||
|
||||
a := &TestAuther{
|
||||
Return: nil,
|
||||
}
|
||||
|
||||
// The dynamic config should be the one we loaded, but with the passed auther
|
||||
expect := base
|
||||
expect.VerifyPeerCertificate = a.Auth
|
||||
require.Equal(t, base, c.TLSConfig(a))
|
||||
|
||||
// The server config should return same too for new connections
|
||||
serverCfg := c.ServerTLSConfig()
|
||||
require.NotNil(t, serverCfg.GetConfigForClient)
|
||||
got, err := serverCfg.GetConfigForClient(&tls.ClientHelloInfo{})
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, base, got)
|
||||
|
||||
// Now change the config as if we just rotated to a new CA
|
||||
new := TestTLSConfig(t, "ca2", "web")
|
||||
err = c.SetTLSConfig(new)
|
||||
require.Nil(t, err)
|
||||
|
||||
// The dynamic config should be the one we loaded (with same auther due to nil)
|
||||
require.Equal(t, new, c.TLSConfig(nil))
|
||||
|
||||
// The server config should return same too for new connections
|
||||
serverCfg = c.ServerTLSConfig()
|
||||
require.NotNil(t, serverCfg.GetConfigForClient)
|
||||
got, err = serverCfg.GetConfigForClient(&tls.ClientHelloInfo{})
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, new, got)
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/hcl"
|
||||
)
|
||||
|
||||
// Config is the publicly configurable state for an entire proxy instance. It's
|
||||
// mostly used as the format for the local-file config mode which is mostly for
|
||||
// dev/testing. In normal use, different parts of this config are pulled from
|
||||
// different locations (e.g. command line, agent config endpoint, agent
|
||||
// certificate endpoints).
|
||||
type Config struct {
|
||||
// ProxyID is the identifier for this proxy as registered in Consul. It's only
|
||||
// guaranteed to be unique per agent.
|
||||
ProxyID string `json:"proxy_id" hcl:"proxy_id"`
|
||||
|
||||
// Token is the authentication token provided for queries to the local agent.
|
||||
Token string `json:"token" hcl:"token"`
|
||||
|
||||
// ProxiedServiceName is the name of the service this proxy is representing.
|
||||
ProxiedServiceName string `json:"proxied_service_name" hcl:"proxied_service_name"`
|
||||
|
||||
// ProxiedServiceNamespace is the namespace of the service this proxy is
|
||||
// representing.
|
||||
ProxiedServiceNamespace string `json:"proxied_service_namespace" hcl:"proxied_service_namespace"`
|
||||
|
||||
// PublicListener configures the mTLS listener.
|
||||
PublicListener PublicListenerConfig `json:"public_listener" hcl:"public_listener"`
|
||||
|
||||
// Upstreams configures outgoing proxies for remote connect services.
|
||||
Upstreams []UpstreamConfig `json:"upstreams" hcl:"upstreams"`
|
||||
|
||||
// DevCAFile allows passing the file path to PEM encoded root certificate
|
||||
// bundle to be used in development instead of the ones supplied by Connect.
|
||||
DevCAFile string `json:"dev_ca_file" hcl:"dev_ca_file"`
|
||||
|
||||
// DevServiceCertFile allows passing the file path to PEM encoded service
|
||||
// certificate (client and server) to be used in development instead of the
|
||||
// ones supplied by Connect.
|
||||
DevServiceCertFile string `json:"dev_service_cert_file" hcl:"dev_service_cert_file"`
|
||||
|
||||
// DevServiceKeyFile allows passing the file path to PEM encoded service
|
||||
// private key to be used in development instead of the ones supplied by
|
||||
// Connect.
|
||||
DevServiceKeyFile string `json:"dev_service_key_file" hcl:"dev_service_key_file"`
|
||||
}
|
||||
|
||||
// ConfigWatcher is a simple interface to allow dynamic configurations from
|
||||
// plugggable sources.
|
||||
type ConfigWatcher interface {
|
||||
// Watch returns a channel that will deliver new Configs if something external
|
||||
// provokes it.
|
||||
Watch() <-chan *Config
|
||||
}
|
||||
|
||||
// StaticConfigWatcher is a simple ConfigWatcher that delivers a static Config
|
||||
// once and then never changes it.
|
||||
type StaticConfigWatcher struct {
|
||||
ch chan *Config
|
||||
}
|
||||
|
||||
// NewStaticConfigWatcher returns a ConfigWatcher for a config that never
|
||||
// changes. It assumes only one "watcher" will ever call Watch. The config is
|
||||
// delivered on the first call but will never be delivered again to allow
|
||||
// callers to call repeatedly (e.g. select in a loop).
|
||||
func NewStaticConfigWatcher(cfg *Config) *StaticConfigWatcher {
|
||||
sc := &StaticConfigWatcher{
|
||||
// Buffer it so we can queue up the config for first delivery.
|
||||
ch: make(chan *Config, 1),
|
||||
}
|
||||
sc.ch <- cfg
|
||||
return sc
|
||||
}
|
||||
|
||||
// Watch implements ConfigWatcher on a static configuration for compatibility.
|
||||
// It returns itself on the channel once and then leaves it open.
|
||||
func (sc *StaticConfigWatcher) Watch() <-chan *Config {
|
||||
return sc.ch
|
||||
}
|
||||
|
||||
// ParseConfigFile parses proxy configuration form a file for local dev.
|
||||
func ParseConfigFile(filename string) (*Config, error) {
|
||||
bs, err := ioutil.ReadFile(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cfg Config
|
||||
|
||||
err = hcl.Unmarshal(bs, &cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// AgentConfigWatcher watches the local Consul agent for proxy config changes.
|
||||
type AgentConfigWatcher struct {
|
||||
client *api.Client
|
||||
}
|
||||
|
||||
// Watch implements ConfigWatcher.
|
||||
func (w *AgentConfigWatcher) Watch() <-chan *Config {
|
||||
watch := make(chan *Config)
|
||||
// TODO implement me
|
||||
return watch
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseConfigFile(t *testing.T) {
|
||||
cfg, err := ParseConfigFile("testdata/config-kitchensink.hcl")
|
||||
require.Nil(t, err)
|
||||
|
||||
expect := &Config{
|
||||
ProxyID: "foo",
|
||||
Token: "11111111-2222-3333-4444-555555555555",
|
||||
ProxiedServiceName: "web",
|
||||
ProxiedServiceNamespace: "default",
|
||||
PublicListener: PublicListenerConfig{
|
||||
BindAddress: ":9999",
|
||||
LocalServiceAddress: "127.0.0.1:5000",
|
||||
LocalConnectTimeoutMs: 1000,
|
||||
HandshakeTimeoutMs: 5000,
|
||||
},
|
||||
Upstreams: []UpstreamConfig{
|
||||
{
|
||||
LocalBindAddress: "127.0.0.1:6000",
|
||||
DestinationName: "db",
|
||||
DestinationNamespace: "default",
|
||||
DestinationType: "service",
|
||||
ConnectTimeoutMs: 10000,
|
||||
},
|
||||
{
|
||||
LocalBindAddress: "127.0.0.1:6001",
|
||||
DestinationName: "geo-cache",
|
||||
DestinationNamespace: "default",
|
||||
DestinationType: "prepared_query",
|
||||
ConnectTimeoutMs: 10000,
|
||||
},
|
||||
},
|
||||
DevCAFile: "connect/testdata/ca1-ca-consul-internal.cert.pem",
|
||||
DevServiceCertFile: "connect/testdata/ca1-svc-web.cert.pem",
|
||||
DevServiceKeyFile: "connect/testdata/ca1-svc-web.key.pem",
|
||||
}
|
||||
|
||||
require.Equal(t, expect, cfg)
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Conn represents a single proxied TCP connection.
|
||||
type Conn struct {
|
||||
src, dst net.Conn
|
||||
stopping int32
|
||||
}
|
||||
|
||||
// NewConn returns a conn joining the two given net.Conn
|
||||
func NewConn(src, dst net.Conn) *Conn {
|
||||
return &Conn{
|
||||
src: src,
|
||||
dst: dst,
|
||||
stopping: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes both connections.
|
||||
func (c *Conn) Close() {
|
||||
atomic.StoreInt32(&c.stopping, 1)
|
||||
c.src.Close()
|
||||
c.dst.Close()
|
||||
}
|
||||
|
||||
// CopyBytes will continuously copy bytes in both directions between src and dst
|
||||
// until either connection is closed.
|
||||
func (c *Conn) CopyBytes() error {
|
||||
defer c.Close()
|
||||
|
||||
go func() {
|
||||
// Need this since Copy is only guaranteed to stop when it's source reader
|
||||
// (second arg) hits EOF or error but either conn might close first possibly
|
||||
// causing this goroutine to exit but not the outer one. See TestSc
|
||||
//defer c.Close()
|
||||
io.Copy(c.dst, c.src)
|
||||
}()
|
||||
_, err := io.Copy(c.src, c.dst)
|
||||
if atomic.LoadInt32(&c.stopping) == 1 {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// testConnSetup listens on a random TCP port and passes the accepted net.Conn
|
||||
// back to test code on returned channel. It then creates a source and
|
||||
// destination Conn. And a cleanup func
|
||||
func testConnSetup(t *testing.T) (net.Conn, net.Conn, func()) {
|
||||
t.Helper()
|
||||
|
||||
l, err := net.Listen("tcp", "localhost:0")
|
||||
require.Nil(t, err)
|
||||
|
||||
ch := make(chan net.Conn, 1)
|
||||
go func(ch chan net.Conn) {
|
||||
src, err := l.Accept()
|
||||
require.Nil(t, err)
|
||||
ch <- src
|
||||
}(ch)
|
||||
|
||||
dst, err := net.Dial("tcp", l.Addr().String())
|
||||
require.Nil(t, err)
|
||||
|
||||
src := <-ch
|
||||
|
||||
stopper := func() {
|
||||
l.Close()
|
||||
src.Close()
|
||||
dst.Close()
|
||||
}
|
||||
|
||||
return src, dst, stopper
|
||||
}
|
||||
|
||||
func TestConn(t *testing.T) {
|
||||
src, dst, stop := testConnSetup(t)
|
||||
defer stop()
|
||||
|
||||
c := NewConn(src, dst)
|
||||
|
||||
retCh := make(chan error, 1)
|
||||
go func() {
|
||||
retCh <- c.CopyBytes()
|
||||
}()
|
||||
|
||||
srcR := bufio.NewReader(src)
|
||||
dstR := bufio.NewReader(dst)
|
||||
|
||||
_, err := src.Write([]byte("ping 1\n"))
|
||||
require.Nil(t, err)
|
||||
_, err = dst.Write([]byte("ping 2\n"))
|
||||
require.Nil(t, err)
|
||||
|
||||
got, err := dstR.ReadString('\n')
|
||||
require.Equal(t, "ping 1\n", got)
|
||||
|
||||
got, err = srcR.ReadString('\n')
|
||||
require.Equal(t, "ping 2\n", got)
|
||||
|
||||
_, err = src.Write([]byte("pong 1\n"))
|
||||
require.Nil(t, err)
|
||||
_, err = dst.Write([]byte("pong 2\n"))
|
||||
require.Nil(t, err)
|
||||
|
||||
got, err = dstR.ReadString('\n')
|
||||
require.Equal(t, "pong 1\n", got)
|
||||
|
||||
got, err = srcR.ReadString('\n')
|
||||
require.Equal(t, "pong 2\n", got)
|
||||
|
||||
c.Close()
|
||||
|
||||
ret := <-retCh
|
||||
require.Nil(t, ret, "Close() should not cause error return")
|
||||
}
|
||||
|
||||
func TestConnSrcClosing(t *testing.T) {
|
||||
src, dst, stop := testConnSetup(t)
|
||||
defer stop()
|
||||
|
||||
c := NewConn(src, dst)
|
||||
retCh := make(chan error, 1)
|
||||
go func() {
|
||||
retCh <- c.CopyBytes()
|
||||
}()
|
||||
|
||||
// If we close the src conn, we expect CopyBytes to return and src to be
|
||||
// closed too. No good way to assert that the conn is closed really other than
|
||||
// assume the retCh receive will hand unless CopyBytes exits and that
|
||||
// CopyBytes defers Closing both. i.e. if this test doesn't time out it's
|
||||
// good!
|
||||
src.Close()
|
||||
<-retCh
|
||||
}
|
||||
|
||||
func TestConnDstClosing(t *testing.T) {
|
||||
src, dst, stop := testConnSetup(t)
|
||||
defer stop()
|
||||
|
||||
c := NewConn(src, dst)
|
||||
retCh := make(chan error, 1)
|
||||
go func() {
|
||||
retCh <- c.CopyBytes()
|
||||
}()
|
||||
|
||||
// If we close the dst conn, we expect CopyBytes to return and src to be
|
||||
// closed too. No good way to assert that the conn is closed really other than
|
||||
// assume the retCh receive will hand unless CopyBytes exits and that
|
||||
// CopyBytes defers Closing both. i.e. if this test doesn't time out it's
|
||||
// good!
|
||||
dst.Close()
|
||||
<-retCh
|
||||
}
|
|
@ -0,0 +1,140 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrExists is the error returned when adding a proxy that exists already.
|
||||
ErrExists = errors.New("proxy with that name already exists")
|
||||
// ErrNotExist is the error returned when removing a proxy that doesn't exist.
|
||||
ErrNotExist = errors.New("proxy with that name doesn't exist")
|
||||
)
|
||||
|
||||
// Manager implements the logic for configuring and running a set of proxiers.
|
||||
// Typically it's used to run one PublicListener and zero or more Upstreams.
|
||||
type Manager struct {
|
||||
ch chan managerCmd
|
||||
|
||||
// stopped is used to signal the caller of StopAll when the run loop exits
|
||||
// after stopping all runners. It's only closed.
|
||||
stopped chan struct{}
|
||||
|
||||
// runners holds the currently running instances. It should only by accessed
|
||||
// from within the `run` goroutine.
|
||||
runners map[string]*Runner
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
type managerCmd struct {
|
||||
name string
|
||||
p Proxier
|
||||
errCh chan error
|
||||
}
|
||||
|
||||
// NewManager creates a manager of proxier instances.
|
||||
func NewManager() *Manager {
|
||||
return NewManagerWithLogger(log.New(os.Stdout, "", log.LstdFlags))
|
||||
}
|
||||
|
||||
// NewManagerWithLogger creates a manager of proxier instances with the
|
||||
// specified logger.
|
||||
func NewManagerWithLogger(logger *log.Logger) *Manager {
|
||||
m := &Manager{
|
||||
ch: make(chan managerCmd),
|
||||
stopped: make(chan struct{}),
|
||||
runners: make(map[string]*Runner),
|
||||
logger: logger,
|
||||
}
|
||||
go m.run()
|
||||
return m
|
||||
}
|
||||
|
||||
// RunProxier starts a new Proxier instance in the manager. It is safe to call
|
||||
// from separate goroutines. If there is already a running proxy with the same
|
||||
// name it returns ErrExists.
|
||||
func (m *Manager) RunProxier(name string, p Proxier) error {
|
||||
cmd := managerCmd{
|
||||
name: name,
|
||||
p: p,
|
||||
errCh: make(chan error),
|
||||
}
|
||||
m.ch <- cmd
|
||||
return <-cmd.errCh
|
||||
}
|
||||
|
||||
// StopProxier stops a Proxier instance by name. It is safe to call from
|
||||
// separate goroutines. If the instance with that name doesn't exist it returns
|
||||
// ErrNotExist.
|
||||
func (m *Manager) StopProxier(name string) error {
|
||||
cmd := managerCmd{
|
||||
name: name,
|
||||
p: nil,
|
||||
errCh: make(chan error),
|
||||
}
|
||||
m.ch <- cmd
|
||||
return <-cmd.errCh
|
||||
}
|
||||
|
||||
// StopAll shuts down the manager instance and stops all running proxies. It is
|
||||
// safe to call from any goroutine but must only be called once.
|
||||
func (m *Manager) StopAll() error {
|
||||
close(m.ch)
|
||||
<-m.stopped
|
||||
return nil
|
||||
}
|
||||
|
||||
// run is the main manager processing loop. It keeps all actions in a single
|
||||
// goroutine triggered by channel commands to keep it simple to reason about
|
||||
// lifecycle events for each proxy.
|
||||
func (m *Manager) run() {
|
||||
defer close(m.stopped)
|
||||
|
||||
// range over channel blocks and loops on each message received until channel
|
||||
// is closed.
|
||||
for cmd := range m.ch {
|
||||
if cmd.p == nil {
|
||||
m.remove(&cmd)
|
||||
} else {
|
||||
m.add(&cmd)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutting down, Stop all the runners
|
||||
for _, r := range m.runners {
|
||||
r.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// add the named proxier instance and stop it. Should only be called from the
|
||||
// run loop.
|
||||
func (m *Manager) add(cmd *managerCmd) {
|
||||
// Check existing
|
||||
if _, ok := m.runners[cmd.name]; ok {
|
||||
cmd.errCh <- ErrExists
|
||||
return
|
||||
}
|
||||
|
||||
// Start new runner
|
||||
r := NewRunnerWithLogger(cmd.name, cmd.p, m.logger)
|
||||
m.runners[cmd.name] = r
|
||||
go r.Listen()
|
||||
cmd.errCh <- nil
|
||||
}
|
||||
|
||||
// remove the named proxier instance and stop it. Should only be called from the
|
||||
// run loop.
|
||||
func (m *Manager) remove(cmd *managerCmd) {
|
||||
// Fetch proxier by name
|
||||
r, ok := m.runners[cmd.name]
|
||||
if !ok {
|
||||
cmd.errCh <- ErrNotExist
|
||||
return
|
||||
}
|
||||
err := r.Stop()
|
||||
delete(m.runners, cmd.name)
|
||||
cmd.errCh <- err
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestManager(t *testing.T) {
|
||||
m := NewManager()
|
||||
|
||||
addrs := TestLocalBindAddrs(t, 3)
|
||||
|
||||
for i := 0; i < len(addrs); i++ {
|
||||
name := fmt.Sprintf("proxier-%d", i)
|
||||
// Run proxy
|
||||
err := m.RunProxier(name, &TestProxier{
|
||||
Addr: addrs[i],
|
||||
Prefix: name + ": ",
|
||||
})
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
// Make sure each one is echoing correctly now all are running
|
||||
for i := 0; i < len(addrs); i++ {
|
||||
conn, err := net.Dial("tcp", addrs[i])
|
||||
require.Nil(t, err)
|
||||
TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i))
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Stop first proxier
|
||||
err := m.StopProxier("proxier-0")
|
||||
require.Nil(t, err)
|
||||
|
||||
// We should fail to dial it now. Note that Runner.Stop is synchronous so
|
||||
// there should be a strong guarantee that it's stopped listening by now.
|
||||
_, err = net.Dial("tcp", addrs[0])
|
||||
require.NotNil(t, err)
|
||||
|
||||
// Rest of proxiers should still be running
|
||||
for i := 1; i < len(addrs); i++ {
|
||||
conn, err := net.Dial("tcp", addrs[i])
|
||||
require.Nil(t, err)
|
||||
TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i))
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// Stop non-existent proxier should fail
|
||||
err = m.StopProxier("foo")
|
||||
require.Equal(t, ErrNotExist, err)
|
||||
|
||||
// Add already-running proxier should fail
|
||||
err = m.RunProxier("proxier-1", &TestProxier{})
|
||||
require.Equal(t, ErrExists, err)
|
||||
|
||||
// But rest should stay running
|
||||
for i := 1; i < len(addrs); i++ {
|
||||
conn, err := net.Dial("tcp", addrs[i])
|
||||
require.Nil(t, err)
|
||||
TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i))
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
// StopAll should stop everything
|
||||
err = m.StopAll()
|
||||
require.Nil(t, err)
|
||||
|
||||
// Verify failures
|
||||
for i := 0; i < len(addrs); i++ {
|
||||
_, err = net.Dial("tcp", addrs[i])
|
||||
require.NotNilf(t, err, "proxier-%d should not be running", i)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
)
|
||||
|
||||
// ErrStopped is returned for operations on a proxy that is stopped
|
||||
var ErrStopped = errors.New("stopped")
|
||||
|
||||
// ErrStopping is returned for operations on a proxy that is stopping
|
||||
var ErrStopping = errors.New("stopping")
|
||||
|
||||
// Proxier is an interface for managing different proxy implementations in a
|
||||
// standard way. We have at least two different types of Proxier implementations
|
||||
// needed: one for the incoming mTLS -> local proxy and another for each
|
||||
// "upstream" service the app needs to talk out to (which listens locally and
|
||||
// performs service discovery to find a suitable remote service).
|
||||
type Proxier interface {
|
||||
// Listener returns a net.Listener that is open and ready for use, the Proxy
|
||||
// manager will arrange accepting new connections from it and passing them to
|
||||
// the handler method.
|
||||
Listener() (net.Listener, error)
|
||||
|
||||
// HandleConn is called for each incoming connection accepted by the listener.
|
||||
// It is called in it's own goroutine and should run until it hits an error.
|
||||
// When stopping the Proxier, the manager will simply close the conn provided
|
||||
// and expects an error to be eventually returned. Any time spent not blocked
|
||||
// on the passed conn (for example doing service discovery) should therefore
|
||||
// be time-bound so that shutdown can't stall forever.
|
||||
HandleConn(conn net.Conn) error
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/connect"
|
||||
)
|
||||
|
||||
// Proxy implements the built-in connect proxy.
|
||||
type Proxy struct {
|
||||
proxyID, token string
|
||||
|
||||
connect connect.Client
|
||||
manager *Manager
|
||||
cfgWatch ConfigWatcher
|
||||
cfg *Config
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// NewFromConfigFile returns a Proxy instance configured just from a local file.
|
||||
// This is intended mostly for development and bypasses the normal mechanisms
|
||||
// for fetching config and certificates from the local agent.
|
||||
func NewFromConfigFile(client *api.Client, filename string,
|
||||
logger *log.Logger) (*Proxy, error) {
|
||||
cfg, err := ParseConfigFile(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
connect, err := connect.NewInsecureDevClientWithLocalCerts(client,
|
||||
cfg.DevCAFile, cfg.DevServiceCertFile, cfg.DevServiceKeyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &Proxy{
|
||||
proxyID: cfg.ProxyID,
|
||||
connect: connect,
|
||||
manager: NewManagerWithLogger(logger),
|
||||
cfgWatch: NewStaticConfigWatcher(cfg),
|
||||
logger: logger,
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// New returns a Proxy with the given id, consuming the provided (configured)
|
||||
// agent. It is ready to Run().
|
||||
func New(client *api.Client, proxyID string, logger *log.Logger) (*Proxy, error) {
|
||||
p := &Proxy{
|
||||
proxyID: proxyID,
|
||||
connect: connect.NewClient(client),
|
||||
manager: NewManagerWithLogger(logger),
|
||||
cfgWatch: &AgentConfigWatcher{client: client},
|
||||
logger: logger,
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Run the proxy instance until a fatal error occurs or ctx is cancelled.
|
||||
func (p *Proxy) Run(ctx context.Context) error {
|
||||
defer p.manager.StopAll()
|
||||
|
||||
// Watch for config changes (initial setup happens on first "change")
|
||||
for {
|
||||
select {
|
||||
case newCfg := <-p.cfgWatch.Watch():
|
||||
p.logger.Printf("[DEBUG] got new config")
|
||||
if p.cfg == nil {
|
||||
// Initial setup
|
||||
err := p.startPublicListener(ctx, newCfg.PublicListener)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO add/remove upstreams properly based on a diff with current
|
||||
for _, uc := range newCfg.Upstreams {
|
||||
uc.Client = p.connect
|
||||
uc.logger = p.logger
|
||||
err := p.manager.RunProxier(uc.String(), NewUpstream(uc))
|
||||
if err == ErrExists {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(),
|
||||
err)
|
||||
}
|
||||
}
|
||||
p.cfg = newCfg
|
||||
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Proxy) startPublicListener(ctx context.Context,
|
||||
cfg PublicListenerConfig) error {
|
||||
|
||||
// Get TLS creds
|
||||
tlsCfg, err := p.connect.ServerTLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.TLSConfig = tlsCfg
|
||||
|
||||
cfg.logger = p.logger
|
||||
return p.manager.RunProxier("public_listener", NewPublicListener(cfg))
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// PublicListener provides an implementation of Proxier that listens for inbound
|
||||
// mTLS connections, authenticates them with the local agent, and if successful
|
||||
// forwards them to the locally configured app.
|
||||
type PublicListener struct {
|
||||
cfg *PublicListenerConfig
|
||||
}
|
||||
|
||||
// PublicListenerConfig contains the most basic parameters needed to start the
|
||||
// proxy.
|
||||
//
|
||||
// Note that the tls.Configs here are expected to be "dynamic" in the sense that
|
||||
// they are expected to use `GetConfigForClient` (added in go 1.8) to return
|
||||
// dynamic config per connection if required.
|
||||
type PublicListenerConfig struct {
|
||||
// BindAddress is the host:port the public mTLS listener will bind to.
|
||||
BindAddress string `json:"bind_address" hcl:"bind_address"`
|
||||
|
||||
// LocalServiceAddress is the host:port for the proxied application. This
|
||||
// should be on loopback or otherwise protected as it's plain TCP.
|
||||
LocalServiceAddress string `json:"local_service_address" hcl:"local_service_address"`
|
||||
|
||||
// TLSConfig config is used for the mTLS listener.
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// LocalConnectTimeout is the timeout for establishing connections with the
|
||||
// local backend. Defaults to 1000 (1s).
|
||||
LocalConnectTimeoutMs int `json:"local_connect_timeout_ms" hcl:"local_connect_timeout_ms"`
|
||||
|
||||
// HandshakeTimeout is the timeout for incoming mTLS clients to complete a
|
||||
// handshake. Setting this low avoids DOS by malicious clients holding
|
||||
// resources open. Defaults to 10000 (10s).
|
||||
HandshakeTimeoutMs int `json:"handshake_timeout_ms" hcl:"handshake_timeout_ms"`
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func (plc *PublicListenerConfig) applyDefaults() {
|
||||
if plc.LocalConnectTimeoutMs == 0 {
|
||||
plc.LocalConnectTimeoutMs = 1000
|
||||
}
|
||||
if plc.HandshakeTimeoutMs == 0 {
|
||||
plc.HandshakeTimeoutMs = 10000
|
||||
}
|
||||
if plc.logger == nil {
|
||||
plc.logger = log.New(os.Stdout, "", log.LstdFlags)
|
||||
}
|
||||
}
|
||||
|
||||
// NewPublicListener returns a proxy instance with the given config.
|
||||
func NewPublicListener(cfg PublicListenerConfig) *PublicListener {
|
||||
p := &PublicListener{
|
||||
cfg: &cfg,
|
||||
}
|
||||
p.cfg.applyDefaults()
|
||||
return p
|
||||
}
|
||||
|
||||
// Listener implements Proxier
|
||||
func (p *PublicListener) Listener() (net.Listener, error) {
|
||||
l, err := net.Listen("tcp", p.cfg.BindAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tls.NewListener(l, p.cfg.TLSConfig), nil
|
||||
}
|
||||
|
||||
// HandleConn implements Proxier
|
||||
func (p *PublicListener) HandleConn(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
tlsConn, ok := conn.(*tls.Conn)
|
||||
if !ok {
|
||||
return fmt.Errorf("non-TLS conn")
|
||||
}
|
||||
|
||||
// Setup Handshake timer
|
||||
to := time.Duration(p.cfg.HandshakeTimeoutMs) * time.Millisecond
|
||||
err := tlsConn.SetDeadline(time.Now().Add(to))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Force TLS handshake so that abusive clients can't hold resources open
|
||||
err = tlsConn.Handshake()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Handshake OK, clear the deadline
|
||||
err = tlsConn.SetDeadline(time.Time{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Huzzah, open a connection to the backend and let them talk
|
||||
// TODO maybe add a connection pool here?
|
||||
to = time.Duration(p.cfg.LocalConnectTimeoutMs) * time.Millisecond
|
||||
dst, err := net.DialTimeout("tcp", p.cfg.LocalServiceAddress, to)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed dialling local app: %s", err)
|
||||
}
|
||||
|
||||
p.cfg.logger.Printf("[DEBUG] accepted connection from %s", conn.RemoteAddr())
|
||||
|
||||
// Hand conn and dst over to Conn to manage the byte copying.
|
||||
c := NewConn(conn, dst)
|
||||
return c.CopyBytes()
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/connect"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPublicListener(t *testing.T) {
|
||||
addrs := TestLocalBindAddrs(t, 2)
|
||||
|
||||
cfg := PublicListenerConfig{
|
||||
BindAddress: addrs[0],
|
||||
LocalServiceAddress: addrs[1],
|
||||
HandshakeTimeoutMs: 100,
|
||||
LocalConnectTimeoutMs: 100,
|
||||
TLSConfig: connect.TestTLSConfig(t, "ca1", "web"),
|
||||
}
|
||||
|
||||
testApp, err := NewTestTCPServer(t, cfg.LocalServiceAddress)
|
||||
require.Nil(t, err)
|
||||
defer testApp.Close()
|
||||
|
||||
p := NewPublicListener(cfg)
|
||||
|
||||
// Run proxy
|
||||
r := NewRunner("test", p)
|
||||
go r.Listen()
|
||||
defer r.Stop()
|
||||
|
||||
// Proxy and backend are running, play the part of a TLS client using same
|
||||
// cert for now.
|
||||
conn, err := tls.Dial("tcp", cfg.BindAddress, connect.TestTLSConfig(t, "ca1", "web"))
|
||||
require.Nil(t, err)
|
||||
TestEchoConn(t, conn, "")
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Runner manages the lifecycle of one Proxier.
|
||||
type Runner struct {
|
||||
name string
|
||||
p Proxier
|
||||
|
||||
// Stopping is if a flag that is updated and read atomically
|
||||
stopping int32
|
||||
stopCh chan struct{}
|
||||
// wg is used to signal back to Stop when all goroutines have stopped
|
||||
wg sync.WaitGroup
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// NewRunner returns a Runner ready to Listen.
|
||||
func NewRunner(name string, p Proxier) *Runner {
|
||||
return NewRunnerWithLogger(name, p, log.New(os.Stdout, "", log.LstdFlags))
|
||||
}
|
||||
|
||||
// NewRunnerWithLogger returns a Runner ready to Listen using the specified
|
||||
// log.Logger.
|
||||
func NewRunnerWithLogger(name string, p Proxier, logger *log.Logger) *Runner {
|
||||
return &Runner{
|
||||
name: name,
|
||||
p: p,
|
||||
stopCh: make(chan struct{}),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Listen starts the proxier instance. It blocks until a fatal error occurs or
|
||||
// Stop() is called.
|
||||
func (r *Runner) Listen() error {
|
||||
if atomic.LoadInt32(&r.stopping) == 1 {
|
||||
return ErrStopped
|
||||
}
|
||||
|
||||
l, err := r.p.Listener()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.logger.Printf("[INFO] proxy: %s listening on %s", r.name, l.Addr().String())
|
||||
|
||||
// Run goroutine that will close listener on stop
|
||||
go func() {
|
||||
<-r.stopCh
|
||||
l.Close()
|
||||
r.logger.Printf("[INFO] proxy: %s shutdown", r.name)
|
||||
}()
|
||||
|
||||
// Add one for the accept loop
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if atomic.LoadInt32(&r.stopping) == 1 {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
go r.handle(conn)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) handle(conn net.Conn) {
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
// Start a goroutine that will watch for the Runner stopping and close the
|
||||
// conn, or watch for the Proxier closing (e.g. because other end hung up) and
|
||||
// stop the goroutine to avoid leaks
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
r.logger.Printf("[DEBUG] proxy: %s: terminating conn", r.name)
|
||||
conn.Close()
|
||||
return
|
||||
case <-doneCh:
|
||||
// Connection is already closed, this goroutine not needed any more
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
err := r.p.HandleConn(conn)
|
||||
if err != nil {
|
||||
r.logger.Printf("[DEBUG] proxy: %s: connection terminated: %s", r.name, err)
|
||||
} else {
|
||||
r.logger.Printf("[DEBUG] proxy: %s: connection terminated", r.name)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the Listener and closes any active connections immediately.
|
||||
func (r *Runner) Stop() error {
|
||||
old := atomic.SwapInt32(&r.stopping, 1)
|
||||
if old == 0 {
|
||||
close(r.stopCh)
|
||||
}
|
||||
r.wg.Wait()
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
# Example proxy config with everything specified
|
||||
|
||||
proxy_id = "foo"
|
||||
token = "11111111-2222-3333-4444-555555555555"
|
||||
|
||||
proxied_service_name = "web"
|
||||
proxied_service_namespace = "default"
|
||||
|
||||
# Assumes running consul in dev mode from the repo root...
|
||||
dev_ca_file = "connect/testdata/ca1-ca-consul-internal.cert.pem"
|
||||
dev_service_cert_file = "connect/testdata/ca1-svc-web.cert.pem"
|
||||
dev_service_key_file = "connect/testdata/ca1-svc-web.key.pem"
|
||||
|
||||
public_listener {
|
||||
bind_address = ":9999"
|
||||
local_service_address = "127.0.0.1:5000"
|
||||
local_connect_timeout_ms = 1000
|
||||
handshake_timeout_ms = 5000
|
||||
}
|
||||
|
||||
upstreams = [
|
||||
{
|
||||
local_bind_address = "127.0.0.1:6000"
|
||||
destination_name = "db"
|
||||
destination_namespace = "default"
|
||||
destination_type = "service"
|
||||
connect_timeout_ms = 10000
|
||||
},
|
||||
{
|
||||
local_bind_address = "127.0.0.1:6001"
|
||||
destination_name = "geo-cache"
|
||||
destination_namespace = "default"
|
||||
destination_type = "prepared_query"
|
||||
connect_timeout_ms = 10000
|
||||
}
|
||||
]
|
|
@ -0,0 +1,170 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/hashicorp/consul/lib/freeport"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestLocalBindAddrs returns n localhost address:port strings with free ports
|
||||
// for binding test listeners to.
|
||||
func TestLocalBindAddrs(t testing.T, n int) []string {
|
||||
ports := freeport.GetT(t, n)
|
||||
addrs := make([]string, n)
|
||||
for i, p := range ports {
|
||||
addrs[i] = fmt.Sprintf("localhost:%d", p)
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// TestTCPServer is a simple TCP echo server for use during tests.
|
||||
type TestTCPServer struct {
|
||||
l net.Listener
|
||||
stopped int32
|
||||
accepted, closed, active int32
|
||||
}
|
||||
|
||||
// NewTestTCPServer opens as a listening socket on the given address and returns
|
||||
// a TestTCPServer serving requests to it. The server is already started and can
|
||||
// be stopped by calling Close().
|
||||
func NewTestTCPServer(t testing.T, addr string) (*TestTCPServer, error) {
|
||||
l, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Printf("test tcp server listening on %s", addr)
|
||||
s := &TestTCPServer{
|
||||
l: l,
|
||||
}
|
||||
go s.accept()
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Close stops the server
|
||||
func (s *TestTCPServer) Close() {
|
||||
atomic.StoreInt32(&s.stopped, 1)
|
||||
if s.l != nil {
|
||||
s.l.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TestTCPServer) accept() error {
|
||||
for {
|
||||
conn, err := s.l.Accept()
|
||||
if err != nil {
|
||||
if atomic.LoadInt32(&s.stopped) == 1 {
|
||||
log.Printf("test tcp echo server %s stopped", s.l.Addr())
|
||||
return nil
|
||||
}
|
||||
log.Printf("test tcp echo server %s failed: %s", s.l.Addr(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
atomic.AddInt32(&s.accepted, 1)
|
||||
atomic.AddInt32(&s.active, 1)
|
||||
|
||||
go func(c net.Conn) {
|
||||
io.Copy(c, c)
|
||||
atomic.AddInt32(&s.closed, 1)
|
||||
atomic.AddInt32(&s.active, -1)
|
||||
}(conn)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEchoConn attempts to write some bytes to conn and expects to read them
|
||||
// back within a short timeout (10ms). If prefix is not empty we expect it to be
|
||||
// poresent at the start of all echoed responses (for example to distinguish
|
||||
// between multiple echo server instances).
|
||||
func TestEchoConn(t testing.T, conn net.Conn, prefix string) {
|
||||
t.Helper()
|
||||
|
||||
// Write some bytes and read them back
|
||||
n, err := conn.Write([]byte("Hello World"))
|
||||
require.Equal(t, 11, n)
|
||||
require.Nil(t, err)
|
||||
|
||||
expectLen := 11 + len(prefix)
|
||||
|
||||
buf := make([]byte, expectLen)
|
||||
// read until our buffer is full - it might be separate packets if prefix is
|
||||
// in use.
|
||||
got := 0
|
||||
for got < expectLen {
|
||||
n, err = conn.Read(buf[got:])
|
||||
require.Nil(t, err)
|
||||
got += n
|
||||
}
|
||||
require.Equal(t, expectLen, got)
|
||||
require.Equal(t, prefix+"Hello World", string(buf[:]))
|
||||
}
|
||||
|
||||
// TestConnectClient is a testing mock that implements connect.Client but
|
||||
// stubs the methods to make testing simpler.
|
||||
type TestConnectClient struct {
|
||||
Server *TestTCPServer
|
||||
TLSConfig *tls.Config
|
||||
Calls []callTuple
|
||||
}
|
||||
type callTuple struct {
|
||||
typ, ns, name string
|
||||
}
|
||||
|
||||
// ServerTLSConfig implements connect.Client
|
||||
func (c *TestConnectClient) ServerTLSConfig() (*tls.Config, error) {
|
||||
return c.TLSConfig, nil
|
||||
}
|
||||
|
||||
// DialService implements connect.Client
|
||||
func (c *TestConnectClient) DialService(ctx context.Context, namespace,
|
||||
name string) (net.Conn, error) {
|
||||
|
||||
c.Calls = append(c.Calls, callTuple{"service", namespace, name})
|
||||
|
||||
// Actually returning a vanilla TCP conn not a TLS one but the caller
|
||||
// shouldn't care for tests since this interface should hide all the TLS
|
||||
// config and verification.
|
||||
return net.Dial("tcp", c.Server.l.Addr().String())
|
||||
}
|
||||
|
||||
// DialPreparedQuery implements connect.Client
|
||||
func (c *TestConnectClient) DialPreparedQuery(ctx context.Context, namespace,
|
||||
name string) (net.Conn, error) {
|
||||
|
||||
c.Calls = append(c.Calls, callTuple{"prepared_query", namespace, name})
|
||||
|
||||
// Actually returning a vanilla TCP conn not a TLS one but the caller
|
||||
// shouldn't care for tests since this interface should hide all the TLS
|
||||
// config and verification.
|
||||
return net.Dial("tcp", c.Server.l.Addr().String())
|
||||
}
|
||||
|
||||
// TestProxier is a simple Proxier instance that can be used in tests.
|
||||
type TestProxier struct {
|
||||
// Addr to listen on
|
||||
Addr string
|
||||
// Prefix to write first before echoing on new connections
|
||||
Prefix string
|
||||
}
|
||||
|
||||
// Listener implements Proxier
|
||||
func (p *TestProxier) Listener() (net.Listener, error) {
|
||||
return net.Listen("tcp", p.Addr)
|
||||
}
|
||||
|
||||
// HandleConn implements Proxier
|
||||
func (p *TestProxier) HandleConn(conn net.Conn) error {
|
||||
_, err := conn.Write([]byte(p.Prefix))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(conn, conn)
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,261 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/connect"
|
||||
)
|
||||
|
||||
// Upstream provides an implementation of Proxier that listens for inbound TCP
|
||||
// connections on the private network shared with the proxied application
|
||||
// (typically localhost). For each accepted connection from the app, it uses the
|
||||
// connect.Client to discover an instance and connect over mTLS.
|
||||
type Upstream struct {
|
||||
cfg *UpstreamConfig
|
||||
}
|
||||
|
||||
// UpstreamConfig configures the upstream
|
||||
type UpstreamConfig struct {
|
||||
// Client is the connect client to perform discovery with
|
||||
Client connect.Client
|
||||
|
||||
// LocalAddress is the host:port to listen on for local app connections.
|
||||
LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr"`
|
||||
|
||||
// DestinationName is the service name of the destination.
|
||||
DestinationName string `json:"destination_name" hcl:"destination_name,attr"`
|
||||
|
||||
// DestinationNamespace is the namespace of the destination.
|
||||
DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr"`
|
||||
|
||||
// DestinationType determines which service discovery method is used to find a
|
||||
// candidate instance to connect to.
|
||||
DestinationType string `json:"destination_type" hcl:"destination_type,attr"`
|
||||
|
||||
// ConnectTimeout is the timeout for establishing connections with the remote
|
||||
// service instance. Defaults to 10,000 (10s).
|
||||
ConnectTimeoutMs int `json:"connect_timeout_ms" hcl:"connect_timeout_ms,attr"`
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
func (uc *UpstreamConfig) applyDefaults() {
|
||||
if uc.ConnectTimeoutMs == 0 {
|
||||
uc.ConnectTimeoutMs = 10000
|
||||
}
|
||||
if uc.logger == nil {
|
||||
uc.logger = log.New(os.Stdout, "", log.LstdFlags)
|
||||
}
|
||||
}
|
||||
|
||||
// String returns a string that uniquely identifies the Upstream. Used for
|
||||
// identifying the upstream in log output and map keys.
|
||||
func (uc *UpstreamConfig) String() string {
|
||||
return fmt.Sprintf("%s->%s:%s/%s", uc.LocalBindAddress, uc.DestinationType,
|
||||
uc.DestinationNamespace, uc.DestinationName)
|
||||
}
|
||||
|
||||
// NewUpstream returns an outgoing proxy instance with the given config.
|
||||
func NewUpstream(cfg UpstreamConfig) *Upstream {
|
||||
u := &Upstream{
|
||||
cfg: &cfg,
|
||||
}
|
||||
u.cfg.applyDefaults()
|
||||
return u
|
||||
}
|
||||
|
||||
// String returns a string that uniquely identifies the Upstream. Used for
|
||||
// identifying the upstream in log output and map keys.
|
||||
func (u *Upstream) String() string {
|
||||
return u.cfg.String()
|
||||
}
|
||||
|
||||
// Listener implements Proxier
|
||||
func (u *Upstream) Listener() (net.Listener, error) {
|
||||
return net.Listen("tcp", u.cfg.LocalBindAddress)
|
||||
}
|
||||
|
||||
// HandleConn implements Proxier
|
||||
func (u *Upstream) HandleConn(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
|
||||
// Discover destination instance
|
||||
dst, err := u.discoverAndDial()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Hand conn and dst over to Conn to manage the byte copying.
|
||||
c := NewConn(conn, dst)
|
||||
return c.CopyBytes()
|
||||
}
|
||||
|
||||
func (u *Upstream) discoverAndDial() (net.Conn, error) {
|
||||
to := time.Duration(u.cfg.ConnectTimeoutMs) * time.Millisecond
|
||||
ctx, cancel := context.WithTimeout(context.Background(), to)
|
||||
defer cancel()
|
||||
|
||||
switch u.cfg.DestinationType {
|
||||
case "service":
|
||||
return u.cfg.Client.DialService(ctx, u.cfg.DestinationNamespace,
|
||||
u.cfg.DestinationName)
|
||||
|
||||
case "prepared_query":
|
||||
return u.cfg.Client.DialPreparedQuery(ctx, u.cfg.DestinationNamespace,
|
||||
u.cfg.DestinationName)
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid destination type %s", u.cfg.DestinationType)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// Upstream represents a service that the proxied application needs to connect
|
||||
// out to. It provides a dedication local TCP listener (usually listening only
|
||||
// on loopback) and forwards incoming connections to the proxy to handle.
|
||||
type Upstream struct {
|
||||
cfg *UpstreamConfig
|
||||
wg sync.WaitGroup
|
||||
|
||||
proxy *Proxy
|
||||
fatalErr error
|
||||
}
|
||||
|
||||
// NewUpstream creates an upstream ready to attach to a proxy instance with
|
||||
// Proxy.AddUpstream. An Upstream can only be attached to single Proxy instance
|
||||
// at once.
|
||||
func NewUpstream(p *Proxy, cfg *UpstreamConfig) *Upstream {
|
||||
return &Upstream{
|
||||
cfg: cfg,
|
||||
proxy: p,
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// UpstreamConfig configures the upstream
|
||||
type UpstreamConfig struct {
|
||||
// LocalAddress is the host:port to listen on for local app connections.
|
||||
LocalAddress string
|
||||
|
||||
// DestinationName is the service name of the destination.
|
||||
DestinationName string
|
||||
|
||||
// DestinationNamespace is the namespace of the destination.
|
||||
DestinationNamespace string
|
||||
|
||||
// DestinationType determines which service discovery method is used to find a
|
||||
// candidate instance to connect to.
|
||||
DestinationType string
|
||||
}
|
||||
|
||||
// String returns a string representation for the upstream for debugging or
|
||||
// use as a unique key.
|
||||
func (uc *UpstreamConfig) String() string {
|
||||
return fmt.Sprintf("%s->%s:%s/%s", uc.LocalAddress, uc.DestinationType,
|
||||
uc.DestinationNamespace, uc.DestinationName)
|
||||
}
|
||||
|
||||
func (u *Upstream) listen() error {
|
||||
l, err := net.Listen("tcp", u.cfg.LocalAddress)
|
||||
if err != nil {
|
||||
u.fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go u.discoverAndConnect(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *Upstream) discoverAndConnect(src net.Conn) {
|
||||
// First, we need an upstream instance from Consul to connect to
|
||||
dstAddrs, err := u.discoverInstances()
|
||||
if err != nil {
|
||||
u.fatal(fmt.Errorf("failed to discover upstream instances: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
if len(dstAddrs) < 1 {
|
||||
log.Printf("[INFO] no instances found for %s", len(dstAddrs), u)
|
||||
}
|
||||
|
||||
// Attempt connection to first one that works
|
||||
// TODO: configurable number/deadline?
|
||||
for idx, addr := range dstAddrs {
|
||||
err := u.proxy.startProxyingConn(src, addr, false)
|
||||
if err != nil {
|
||||
log.Printf("[INFO] failed to connect to %s: %s (%d of %d)", addr, err,
|
||||
idx+1, len(dstAddrs))
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[INFO] failed to connect to all %d instances for %s",
|
||||
len(dstAddrs), u)
|
||||
}
|
||||
|
||||
func (u *Upstream) discoverInstances() ([]string, error) {
|
||||
switch u.cfg.DestinationType {
|
||||
case "service":
|
||||
svcs, _, err := u.cfg.Consul.Health().Service(u.cfg.DestinationName, "",
|
||||
true, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addrs := make([]string, len(svcs))
|
||||
|
||||
// Shuffle order as we go since health endpoint doesn't
|
||||
perm := rand.Perm(len(addrs))
|
||||
for i, se := range svcs {
|
||||
// Pick location in output array based on next permutation position
|
||||
j := perm[i]
|
||||
addrs[j] = fmt.Sprintf("%s:%d", se.Service.Address, se.Service.Port)
|
||||
}
|
||||
|
||||
return addrs, nil
|
||||
|
||||
case "prepared_query":
|
||||
pqr, _, err := u.cfg.Consul.PreparedQuery().Execute(u.cfg.DestinationName,
|
||||
nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addrs := make([]string, 0, len(svcs))
|
||||
for _, se := range pqr.Nodes {
|
||||
addrs = append(addrs, fmt.Sprintf("%s:%d", se.Service.Address,
|
||||
se.Service.Port))
|
||||
}
|
||||
|
||||
// PreparedQuery execution already shuffles the result
|
||||
return addrs, nil
|
||||
|
||||
default:
|
||||
u.fatal(fmt.Errorf("invalid destination type %s", u.cfg.DestinationType))
|
||||
}
|
||||
}
|
||||
|
||||
func (u *Upstream) fatal(err Error) {
|
||||
log.Printf("[ERROR] upstream %s stopping on error: %s", u.cfg.LocalAddress,
|
||||
err)
|
||||
u.fatalErr = err
|
||||
}
|
||||
|
||||
// String returns a string representation for the upstream for debugging or
|
||||
// use as a unique key.
|
||||
func (u *Upstream) String() string {
|
||||
return u.cfg.String()
|
||||
}
|
||||
*/
|
|
@ -0,0 +1,75 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/connect"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUpstream(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg UpstreamConfig
|
||||
}{
|
||||
{
|
||||
name: "service",
|
||||
cfg: UpstreamConfig{
|
||||
DestinationType: "service",
|
||||
DestinationNamespace: "default",
|
||||
DestinationName: "db",
|
||||
ConnectTimeoutMs: 100,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "prepared_query",
|
||||
cfg: UpstreamConfig{
|
||||
DestinationType: "prepared_query",
|
||||
DestinationNamespace: "default",
|
||||
DestinationName: "geo-db",
|
||||
ConnectTimeoutMs: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
addrs := TestLocalBindAddrs(t, 2)
|
||||
|
||||
testApp, err := NewTestTCPServer(t, addrs[0])
|
||||
require.Nil(t, err)
|
||||
defer testApp.Close()
|
||||
|
||||
// Create mock client that will "discover" our test tcp server as a target and
|
||||
// skip TLS altogether.
|
||||
client := &TestConnectClient{
|
||||
Server: testApp,
|
||||
TLSConfig: connect.TestTLSConfig(t, "ca1", "web"),
|
||||
}
|
||||
|
||||
// Override cfg params
|
||||
tt.cfg.LocalBindAddress = addrs[1]
|
||||
tt.cfg.Client = client
|
||||
|
||||
u := NewUpstream(tt.cfg)
|
||||
|
||||
// Run proxy
|
||||
r := NewRunner("test", u)
|
||||
go r.Listen()
|
||||
defer r.Stop()
|
||||
|
||||
// Proxy and fake remote service are running, play the part of the app
|
||||
// connecting to a remote connect service over TCP.
|
||||
conn, err := net.Dial("tcp", tt.cfg.LocalBindAddress)
|
||||
require.Nil(t, err)
|
||||
TestEchoConn(t, conn, "")
|
||||
|
||||
// Validate that discovery actually was called as we expected
|
||||
require.Len(t, client.Calls, 1)
|
||||
require.Equal(t, tt.cfg.DestinationType, client.Calls[0].typ)
|
||||
require.Equal(t, tt.cfg.DestinationNamespace, client.Calls[0].ns)
|
||||
require.Equal(t, tt.cfg.DestinationName, client.Calls[0].name)
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue