From 69d5efdbbdab05244a4a7224ac0c022e939e44e4 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Wed, 21 Mar 2018 22:35:00 +0000 Subject: [PATCH] Original proxy and connect.Client implementation. Working end to end. --- command/commands_oss.go | 4 + command/connect/connect.go | 40 +++ command/connect/connect_test.go | 13 + command/connect/proxy/proxy.go | 166 +++++++++++ command/connect/proxy/proxy_test.go | 1 + connect/auth.go | 43 +++ connect/client.go | 256 +++++++++++++++++ connect/client_test.go | 148 ++++++++++ .../testdata/ca1-ca-consul-internal.cert.pem | 14 + .../testdata/ca1-ca-consul-internal.key.pem | 5 + connect/testdata/ca1-svc-cache.cert.pem | 14 + connect/testdata/ca1-svc-cache.key.pem | 5 + connect/testdata/ca1-svc-db.cert.pem | 13 + connect/testdata/ca1-svc-db.key.pem | 5 + connect/testdata/ca1-svc-web.cert.pem | 13 + connect/testdata/ca1-svc-web.key.pem | 5 + connect/testdata/ca2-ca-vault.cert.pem | 14 + connect/testdata/ca2-ca-vault.key.pem | 5 + connect/testdata/ca2-svc-cache.cert.pem | 13 + connect/testdata/ca2-svc-cache.key.pem | 5 + connect/testdata/ca2-svc-db.cert.pem | 13 + connect/testdata/ca2-svc-db.key.pem | 5 + connect/testdata/ca2-svc-web.cert.pem | 13 + connect/testdata/ca2-svc-web.key.pem | 5 + connect/testdata/ca2-xc-by-ca1.cert.pem | 14 + connect/testdata/mkcerts.go | 243 ++++++++++++++++ connect/testing.go | 88 ++++++ connect/tls.go | 124 +++++++++ connect/tls_test.go | 45 +++ proxy/config.go | 111 ++++++++ proxy/config_test.go | 46 +++ proxy/conn.go | 48 ++++ proxy/conn_test.go | 119 ++++++++ proxy/manager.go | 140 ++++++++++ proxy/manager_test.go | 76 +++++ proxy/proxier.go | 32 +++ proxy/proxy.go | 112 ++++++++ proxy/public_listener.go | 119 ++++++++ proxy/public_listener_test.go | 38 +++ proxy/runner.go | 118 ++++++++ proxy/testdata/config-kitchensink.hcl | 36 +++ proxy/testing.go | 170 ++++++++++++ proxy/upstream.go | 261 ++++++++++++++++++ proxy/upstream_test.go | 75 +++++ 44 files changed, 2833 insertions(+) create mode 100644 command/connect/connect.go create mode 100644 command/connect/connect_test.go create mode 100644 command/connect/proxy/proxy.go create mode 100644 command/connect/proxy/proxy_test.go create mode 100644 connect/auth.go create mode 100644 connect/client.go create mode 100644 connect/client_test.go create mode 100644 connect/testdata/ca1-ca-consul-internal.cert.pem create mode 100644 connect/testdata/ca1-ca-consul-internal.key.pem create mode 100644 connect/testdata/ca1-svc-cache.cert.pem create mode 100644 connect/testdata/ca1-svc-cache.key.pem create mode 100644 connect/testdata/ca1-svc-db.cert.pem create mode 100644 connect/testdata/ca1-svc-db.key.pem create mode 100644 connect/testdata/ca1-svc-web.cert.pem create mode 100644 connect/testdata/ca1-svc-web.key.pem create mode 100644 connect/testdata/ca2-ca-vault.cert.pem create mode 100644 connect/testdata/ca2-ca-vault.key.pem create mode 100644 connect/testdata/ca2-svc-cache.cert.pem create mode 100644 connect/testdata/ca2-svc-cache.key.pem create mode 100644 connect/testdata/ca2-svc-db.cert.pem create mode 100644 connect/testdata/ca2-svc-db.key.pem create mode 100644 connect/testdata/ca2-svc-web.cert.pem create mode 100644 connect/testdata/ca2-svc-web.key.pem create mode 100644 connect/testdata/ca2-xc-by-ca1.cert.pem create mode 100644 connect/testdata/mkcerts.go create mode 100644 connect/testing.go create mode 100644 connect/tls.go create mode 100644 connect/tls_test.go create mode 100644 proxy/config.go create mode 100644 proxy/config_test.go create mode 100644 proxy/conn.go create mode 100644 proxy/conn_test.go create mode 100644 proxy/manager.go create mode 100644 proxy/manager_test.go create mode 100644 proxy/proxier.go create mode 100644 proxy/proxy.go create mode 100644 proxy/public_listener.go create mode 100644 proxy/public_listener_test.go create mode 100644 proxy/runner.go create mode 100644 proxy/testdata/config-kitchensink.hcl create mode 100644 proxy/testing.go create mode 100644 proxy/upstream.go create mode 100644 proxy/upstream_test.go diff --git a/command/commands_oss.go b/command/commands_oss.go index 43fbeb29c9..c1e3e794ab 100644 --- a/command/commands_oss.go +++ b/command/commands_oss.go @@ -6,6 +6,8 @@ import ( catlistdc "github.com/hashicorp/consul/command/catalog/list/dc" catlistnodes "github.com/hashicorp/consul/command/catalog/list/nodes" catlistsvc "github.com/hashicorp/consul/command/catalog/list/services" + "github.com/hashicorp/consul/command/connect" + "github.com/hashicorp/consul/command/connect/proxy" "github.com/hashicorp/consul/command/event" "github.com/hashicorp/consul/command/exec" "github.com/hashicorp/consul/command/forceleave" @@ -58,6 +60,8 @@ func init() { Register("catalog datacenters", func(ui cli.Ui) (cli.Command, error) { return catlistdc.New(ui), nil }) Register("catalog nodes", func(ui cli.Ui) (cli.Command, error) { return catlistnodes.New(ui), nil }) Register("catalog services", func(ui cli.Ui) (cli.Command, error) { return catlistsvc.New(ui), nil }) + Register("connect", func(ui cli.Ui) (cli.Command, error) { return connect.New(), nil }) + Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil }) Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil }) Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil }) Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil }) diff --git a/command/connect/connect.go b/command/connect/connect.go new file mode 100644 index 0000000000..60c2388766 --- /dev/null +++ b/command/connect/connect.go @@ -0,0 +1,40 @@ +package connect + +import ( + "github.com/hashicorp/consul/command/flags" + "github.com/mitchellh/cli" +) + +func New() *cmd { + return &cmd{} +} + +type cmd struct{} + +func (c *cmd) Run(args []string) int { + return cli.RunResultHelp +} + +func (c *cmd) Synopsis() string { + return synopsis +} + +func (c *cmd) Help() string { + return flags.Usage(help, nil) +} + +const synopsis = "Interact with Consul Connect" +const help = ` +Usage: consul connect [options] [args] + + This command has subcommands for interacting with Consul Connect. + + Here are some simple examples, and more detailed examples are available + in the subcommands or the documentation. + + Run the built-in Connect mTLS proxy + + $ consul connect proxy + + For more examples, ask for subcommand help or view the documentation. +` diff --git a/command/connect/connect_test.go b/command/connect/connect_test.go new file mode 100644 index 0000000000..95c8ebd58e --- /dev/null +++ b/command/connect/connect_test.go @@ -0,0 +1,13 @@ +package connect + +import ( + "strings" + "testing" +) + +func TestCatalogCommand_noTabs(t *testing.T) { + t.Parallel() + if strings.ContainsRune(New().Help(), '\t') { + t.Fatal("help has tabs") + } +} diff --git a/command/connect/proxy/proxy.go b/command/connect/proxy/proxy.go new file mode 100644 index 0000000000..237f4b7e2f --- /dev/null +++ b/command/connect/proxy/proxy.go @@ -0,0 +1,166 @@ +package proxy + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "net/http" + // Expose pprof if configured + _ "net/http/pprof" + + "github.com/hashicorp/consul/command/flags" + proxyImpl "github.com/hashicorp/consul/proxy" + + "github.com/hashicorp/consul/logger" + "github.com/hashicorp/logutils" + "github.com/mitchellh/cli" +) + +func New(ui cli.Ui, shutdownCh <-chan struct{}) *cmd { + c := &cmd{UI: ui, shutdownCh: shutdownCh} + c.init() + return c +} + +type cmd struct { + UI cli.Ui + flags *flag.FlagSet + http *flags.HTTPFlags + help string + + shutdownCh <-chan struct{} + + logFilter *logutils.LevelFilter + logOutput io.Writer + logger *log.Logger + + // flags + logLevel string + cfgFile string + proxyID string + pprofAddr string +} + +func (c *cmd) init() { + c.flags = flag.NewFlagSet("", flag.ContinueOnError) + + c.flags.StringVar(&c.cfgFile, "insecure-dev-config", "", + "If set, proxy config is read on startup from this file (in HCL or JSON"+ + "format). If a config file is given, the proxy will use that instead of "+ + "querying the local agent for it's configuration. It will not reload it "+ + "except on startup. In this mode the proxy WILL NOT authorize incoming "+ + "connections with the local agent which is totally insecure. This is "+ + "ONLY for development and testing.") + + c.flags.StringVar(&c.proxyID, "proxy-id", "", + "The proxy's ID on the local agent.") + + c.flags.StringVar(&c.logLevel, "log-level", "INFO", + "Specifies the log level.") + + c.flags.StringVar(&c.pprofAddr, "pprof-addr", "", + "Enable debugging via pprof. Providing a host:port (or just ':port') "+ + "enables profiling HTTP endpoints on that address.") + + c.http = &flags.HTTPFlags{} + flags.Merge(c.flags, c.http.ClientFlags()) + flags.Merge(c.flags, c.http.ServerFlags()) + c.help = flags.Usage(help, c.flags) +} + +func (c *cmd) Run(args []string) int { + if err := c.flags.Parse(args); err != nil { + return 1 + } + + // Setup the log outputs + logConfig := &logger.Config{ + LogLevel: c.logLevel, + } + logFilter, logGate, _, logOutput, ok := logger.Setup(logConfig, c.UI) + if !ok { + return 1 + } + c.logFilter = logFilter + c.logOutput = logOutput + c.logger = log.New(logOutput, "", log.LstdFlags) + + // Enable Pprof if needed + if c.pprofAddr != "" { + go func() { + c.UI.Output(fmt.Sprintf("Starting pprof HTTP endpoints on "+ + "http://%s/debug/pprof", c.pprofAddr)) + log.Fatal(http.ListenAndServe(c.pprofAddr, nil)) + }() + } + + // Setup Consul client + client, err := c.http.APIClient() + if err != nil { + c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + + var p *proxyImpl.Proxy + if c.cfgFile != "" { + c.UI.Info("Configuring proxy locally from " + c.cfgFile) + + p, err = proxyImpl.NewFromConfigFile(client, c.cfgFile, c.logger) + if err != nil { + c.UI.Error(fmt.Sprintf("Failed configuring from file: %s", err)) + return 1 + } + + } else { + p, err = proxyImpl.New(client, c.proxyID, c.logger) + if err != nil { + c.UI.Error(fmt.Sprintf("Failed configuring from agent: %s", err)) + return 1 + } + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + err := p.Run(ctx) + if err != nil { + c.UI.Error(fmt.Sprintf("Failed running proxy: %s", err)) + } + // If we exited early due to a fatal error, need to unblock the main + // routine. But we can't close shutdownCh since it might already be closed + // by a signal and there is no way to tell. We also can't send on it to + // unblock main routine since it's typed as receive only. So the best thing + // we can do is cancel the context and have the main routine select on both. + cancel() + }() + + c.UI.Output("Consul Connect proxy running!") + + c.UI.Output("Log data will now stream in as it occurs:\n") + logGate.Flush() + + // Wait for shutdown or context cancel (see Run() goroutine above) + select { + case <-c.shutdownCh: + cancel() + case <-ctx.Done(): + } + c.UI.Output("Consul Connect proxy shutdown") + return 0 +} + +func (c *cmd) Synopsis() string { + return synopsis +} + +func (c *cmd) Help() string { + return c.help +} + +const synopsis = "Runs a Consul Connect proxy" +const help = ` +Usage: consul proxy [options] + + Starts a Consul Connect proxy and runs until an interrupt is received. +` diff --git a/command/connect/proxy/proxy_test.go b/command/connect/proxy/proxy_test.go new file mode 100644 index 0000000000..943b369ffe --- /dev/null +++ b/command/connect/proxy/proxy_test.go @@ -0,0 +1 @@ +package proxy diff --git a/connect/auth.go b/connect/auth.go new file mode 100644 index 0000000000..73c16f0bf5 --- /dev/null +++ b/connect/auth.go @@ -0,0 +1,43 @@ +package connect + +import "crypto/x509" + +// Auther is the interface that provides both Authentication and Authorization +// for an mTLS connection. It's only method is compatible with +// tls.Config.VerifyPeerCertificate. +type Auther interface { + // Auth is called during tls Connection establishment to Authenticate and + // Authorize the presented peer. Note that verifiedChains must not be relied + // upon as we typically have to skip Go's internal verification so the + // implementation takes full responsibility to validating the certificate + // against known roots. It is also up to the user of the interface to ensure + // appropriate validation is performed for client or server end by arranging + // for an appropriate implementation to be hooked into the tls.Config used. + Auth(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error +} + +// ClientAuther is used to auth Clients connecting to a Server. +type ClientAuther struct{} + +// Auth implements Auther +func (a *ClientAuther) Auth(rawCerts [][]byte, + verifiedChains [][]*x509.Certificate) error { + + // TODO(banks): implement path validation and AuthZ + return nil +} + +// ServerAuther is used to auth the Server identify from a connecting Client. +type ServerAuther struct { + // TODO(banks): We'll need a way to pass the expected service identity (name, + // namespace, dc, cluster) here based on discovery result. +} + +// Auth implements Auther +func (a *ServerAuther) Auth(rawCerts [][]byte, + verifiedChains [][]*x509.Certificate) error { + + // TODO(banks): implement path validation and verify URI matches the target + // service we intended to connect to. + return nil +} diff --git a/connect/client.go b/connect/client.go new file mode 100644 index 0000000000..867bf0db54 --- /dev/null +++ b/connect/client.go @@ -0,0 +1,256 @@ +package connect + +import ( + "context" + "crypto/tls" + "fmt" + "math/rand" + "net" + + "github.com/hashicorp/consul/api" +) + +// CertStatus indicates whether the Client currently has valid certificates for +// incoming and outgoing connections. +type CertStatus int + +const ( + // CertStatusUnknown is the zero value for CertStatus which may be returned + // when a watch channel is closed on shutdown. It has no other meaning. + CertStatusUnknown CertStatus = iota + + // CertStatusOK indicates the client has valid certificates and trust roots to + // Authenticate incoming and outgoing connections. + CertStatusOK + + // CertStatusPending indicates the client is waiting to be issued initial + // certificates, or that it's certificates have expired and it's waiting to be + // issued new ones. In this state all incoming and outgoing connections will + // fail. + CertStatusPending +) + +func (s CertStatus) String() string { + switch s { + case CertStatusOK: + return "OK" + case CertStatusPending: + return "pending" + case CertStatusUnknown: + fallthrough + default: + return "unknown" + } +} + +// Client is the interface a basic client implementation must support. +type Client interface { + // TODO(banks): build this and test it + // CertStatus returns the current status of the client's certificates. It can + // be used to determine if the Client is able to service requests at the + // current time. + //CertStatus() CertStatus + + // TODO(banks): build this and test it + // WatchCertStatus returns a channel that is notified on all status changes. + // Note that a message on the channel isn't guaranteed to be different so it's + // value should be inspected. During Client shutdown the channel will be + // closed returning a zero type which is equivalent to CertStatusUnknown. + //WatchCertStatus() <-chan CertStatus + + // ServerTLSConfig returns the *tls.Config to be used when creating a TCP + // listener that should accept Connect connections. It is likely that at + // startup the tlsCfg returned will not be immediately usable since + // certificates are typically fetched from the agent asynchronously. In this + // case it's still safe to listen with the provided config, but auth failures + // will occur until initial certificate discovery is complete. In general at + // any time it is possible for certificates to expire before new replacements + // have been issued due to local network errors so the server may not actually + // have a working certificate configuration at any time, however as soon as + // valid certs can be issued it will automatically start working again so + // should take no action. + ServerTLSConfig() (*tls.Config, error) + + // DialService opens a new connection to the named service registered in + // Consul. It will perform service discovery to find healthy instances. If + // there is an error during connection it is returned and the caller may call + // again. The client implementation makes a best effort to make consecutive + // Dials against different instances either by randomising the list and/or + // maintaining a local memory of which instances recently failed. If the + // context passed times out before connection is established and verified an + // error is returned. + DialService(ctx context.Context, namespace, name string) (net.Conn, error) + + // DialPreparedQuery opens a new connection by executing the named Prepared + // Query against the local Consul agent, and picking one of the returned + // instances to connect to. It will perform service discovery with the same + // semantics as DialService. + DialPreparedQuery(ctx context.Context, namespace, name string) (net.Conn, error) +} + +/* + +Maybe also convenience wrappers for: + - listening TLS conn with right config + - http.ListenAndServeTLS equivalent + +*/ + +// AgentClient is the primary implementation of a connect.Client which +// communicates with the local Consul agent. +type AgentClient struct { + agent *api.Client + tlsCfg *ReloadableTLSConfig +} + +// NewClient returns an AgentClient to allow consuming and providing +// Connect-enabled network services. +func NewClient(agent *api.Client) Client { + // TODO(banks): hook up fetching certs from Agent and updating tlsCfg on cert + // delivery/change. Perhaps need to make + return &AgentClient{ + agent: agent, + tlsCfg: NewReloadableTLSConfig(defaultTLSConfig()), + } +} + +// NewInsecureDevClientWithLocalCerts returns an AgentClient that will still do +// service discovery via the local agent but will use externally provided +// certificates and skip authorization. This is intended just for development +// and must not be used ever in production. +func NewInsecureDevClientWithLocalCerts(agent *api.Client, caFile, certFile, + keyFile string) (Client, error) { + + cfg, err := devTLSConfigFromFiles(caFile, certFile, keyFile) + if err != nil { + return nil, err + } + + return &AgentClient{ + agent: agent, + tlsCfg: NewReloadableTLSConfig(cfg), + }, nil +} + +// ServerTLSConfig implements Client +func (c *AgentClient) ServerTLSConfig() (*tls.Config, error) { + return c.tlsCfg.ServerTLSConfig(), nil +} + +// DialService implements Client +func (c *AgentClient) DialService(ctx context.Context, namespace, + name string) (net.Conn, error) { + return c.dial(ctx, "service", namespace, name) +} + +// DialPreparedQuery implements Client +func (c *AgentClient) DialPreparedQuery(ctx context.Context, namespace, + name string) (net.Conn, error) { + return c.dial(ctx, "prepared_query", namespace, name) +} + +func (c *AgentClient) dial(ctx context.Context, discoveryType, namespace, + name string) (net.Conn, error) { + + svcs, err := c.discoverInstances(ctx, discoveryType, namespace, name) + if err != nil { + return nil, err + } + + svc, err := c.pickInstance(svcs) + if err != nil { + return nil, err + } + if svc == nil { + return nil, fmt.Errorf("no healthy services discovered") + } + + // OK we have a service we can dial! We need a ClientAuther that will validate + // the connection is legit. + + // TODO(banks): implement ClientAuther properly to actually verify connected + // cert matches the expected service/cluster etc. based on svc. + auther := &ClientAuther{} + tlsConfig := c.tlsCfg.TLSConfig(auther) + + // Resolve address TODO(banks): I expected this to happen magically in the + // agent at registration time if I register with no explicit address but + // apparently doesn't. This is a quick hack to make it work for now, need to + // see if there is a better shared code path for doing this. + addr := svc.Service.Address + if addr == "" { + addr = svc.Node.Address + } + var dialer net.Dialer + tcpConn, err := dialer.DialContext(ctx, "tcp", + fmt.Sprintf("%s:%d", addr, svc.Service.Port)) + if err != nil { + return nil, err + } + + tlsConn := tls.Client(tcpConn, tlsConfig) + err = tlsConn.Handshake() + if err != nil { + tlsConn.Close() + return nil, err + } + + return tlsConn, nil +} + +// pickInstance returns an instance from the given list to try to connect to. It +// may be made pluggable later, for now it just picks a random one regardless of +// whether the list is already shuffled. +func (c *AgentClient) pickInstance(svcs []*api.ServiceEntry) (*api.ServiceEntry, error) { + if len(svcs) < 1 { + return nil, nil + } + idx := rand.Intn(len(svcs)) + return svcs[idx], nil +} + +// discoverInstances returns all instances for the given discoveryType, +// namespace and name. The returned service entries may or may not be shuffled +func (c *AgentClient) discoverInstances(ctx context.Context, discoverType, + namespace, name string) ([]*api.ServiceEntry, error) { + + q := &api.QueryOptions{ + // TODO(banks): make this configurable? + AllowStale: true, + } + q = q.WithContext(ctx) + + switch discoverType { + case "service": + svcs, _, err := c.agent.Health().Connect(name, "", true, q) + if err != nil { + return nil, err + } + return svcs, err + + case "prepared_query": + // TODO(banks): it's not super clear to me how this should work eventually. + // How do we distinguise between a PreparedQuery for the actual services and + // one that should return the connect proxies where that differs? If we + // can't then we end up with a janky UX where user specifies a reasonable + // prepared query but we try to connect to non-connect services and fail + // with a confusing TLS error. Maybe just a way to filter PreparedQuery + // results by connect-enabled would be sufficient (or even metadata to do + // that ourselves in the response although less efficient). + resp, _, err := c.agent.PreparedQuery().Execute(name, q) + if err != nil { + return nil, err + } + + // Awkward, we have a slice of api.ServiceEntry here but want a slice of + // *api.ServiceEntry for compat with Connect/Service APIs. Have to convert + // them to keep things type-happy. + svcs := make([]*api.ServiceEntry, len(resp.Nodes)) + for idx, se := range resp.Nodes { + svcs[idx] = &se + } + return svcs, err + default: + return nil, fmt.Errorf("unsupported discovery type: %s", discoverType) + } +} diff --git a/connect/client_test.go b/connect/client_test.go new file mode 100644 index 0000000000..fcb18e6000 --- /dev/null +++ b/connect/client_test.go @@ -0,0 +1,148 @@ +package connect + +import ( + "context" + "crypto/x509" + "crypto/x509/pkix" + "encoding/asn1" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testutil" + "github.com/stretchr/testify/require" +) + +func TestNewInsecureDevClientWithLocalCerts(t *testing.T) { + + agent, err := api.NewClient(api.DefaultConfig()) + require.Nil(t, err) + + got, err := NewInsecureDevClientWithLocalCerts(agent, + "testdata/ca1-ca-consul-internal.cert.pem", + "testdata/ca1-svc-web.cert.pem", + "testdata/ca1-svc-web.key.pem", + ) + require.Nil(t, err) + + // Sanity check correct certs were loaded + serverCfg, err := got.ServerTLSConfig() + require.Nil(t, err) + caSubjects := serverCfg.RootCAs.Subjects() + require.Len(t, caSubjects, 1) + caSubject, err := testNameFromRawDN(caSubjects[0]) + require.Nil(t, err) + require.Equal(t, "Consul Internal", caSubject.CommonName) + + require.Len(t, serverCfg.Certificates, 1) + cert, err := x509.ParseCertificate(serverCfg.Certificates[0].Certificate[0]) + require.Nil(t, err) + require.Equal(t, "web", cert.Subject.CommonName) +} + +func testNameFromRawDN(raw []byte) (*pkix.Name, error) { + var seq pkix.RDNSequence + if _, err := asn1.Unmarshal(raw, &seq); err != nil { + return nil, err + } + + var name pkix.Name + name.FillFromRDNSequence(&seq) + return &name, nil +} + +func testAgent(t *testing.T) (*testutil.TestServer, *api.Client) { + t.Helper() + + // Make client config + conf := api.DefaultConfig() + + // Create server + server, err := testutil.NewTestServerConfigT(t, nil) + require.Nil(t, err) + + conf.Address = server.HTTPAddr + + // Create client + agent, err := api.NewClient(conf) + require.Nil(t, err) + + return server, agent +} + +func testService(t *testing.T, ca, name string, client *api.Client) *httptest.Server { + t.Helper() + + // Run a test service to discover + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("svc: " + name)) + })) + server.TLS = TestTLSConfig(t, ca, name) + server.StartTLS() + + u, err := url.Parse(server.URL) + require.Nil(t, err) + + port, err := strconv.Atoi(u.Port()) + require.Nil(t, err) + + // If client is passed, register the test service instance + if client != nil { + svc := &api.AgentServiceRegistration{ + // TODO(banks): we don't really have a good way to represent + // connect-native apps yet so we have to pretend out little server is a + // proxy for now. + Kind: api.ServiceKindConnectProxy, + ProxyDestination: name, + Name: name + "-proxy", + Address: u.Hostname(), + Port: port, + } + err := client.Agent().ServiceRegister(svc) + require.Nil(t, err) + } + + return server +} + +func TestDialService(t *testing.T) { + consulServer, agent := testAgent(t) + defer consulServer.Stop() + + svc := testService(t, "ca1", "web", agent) + defer svc.Close() + + c, err := NewInsecureDevClientWithLocalCerts(agent, + "testdata/ca1-ca-consul-internal.cert.pem", + "testdata/ca1-svc-web.cert.pem", + "testdata/ca1-svc-web.key.pem", + ) + require.Nil(t, err) + + conn, err := c.DialService(context.Background(), "default", "web") + require.Nilf(t, err, "err: %s", err) + + // Inject our conn into http.Transport + httpClient := &http.Client{ + Transport: &http.Transport{ + DialTLS: func(network, addr string) (net.Conn, error) { + return conn, nil + }, + }, + } + + // Don't be fooled the hostname here is ignored since we did the dialling + // ourselves + resp, err := httpClient.Get("https://web.connect.consul/") + require.Nil(t, err) + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + require.Nil(t, err) + + require.Equal(t, "svc: web", string(body)) +} diff --git a/connect/testdata/ca1-ca-consul-internal.cert.pem b/connect/testdata/ca1-ca-consul-internal.cert.pem new file mode 100644 index 0000000000..6a557775f9 --- /dev/null +++ b/connect/testdata/ca1-ca-consul-internal.cert.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICIDCCAcagAwIBAgIBATAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg +SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAaMRgwFgYD +VQQDEw9Db25zdWwgSW50ZXJuYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAT3 +IPiDHugKYEVaSpIzBjqU5lQrmirC6N1XHyOAhF2psGGxcxezpf8Vgy5Iv6XbmeHr +cttyzUYtUKhrFBhxkPYRo4H8MIH5MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8E +BTADAQH/MCkGA1UdDgQiBCCrnNQy2IQS73Co9WbrPXtq/YP9SvIBOJ8iYRWTOxjC +qTArBgNVHSMEJDAigCCrnNQy2IQS73Co9WbrPXtq/YP9SvIBOJ8iYRWTOxjCqTA/ +BgNVHREEODA2hjRzcGlmZmU6Ly8xMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1 +NTU1NTU1NTUuY29uc3VsMD0GA1UdHgEB/wQzMDGgLzAtgisxMTExMTExMS0yMjIy +LTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqGSM49BAMCA0gAMEUC +IQDwWL6ZuszKrZjSJwDzdhRQtj1ppezJrKaDTJx+4F/tyQIgEaQCR935ztIqZzgO +Ka6ozcH2Ubd4j4cDC1XswVMW6zs= +-----END CERTIFICATE----- diff --git a/connect/testdata/ca1-ca-consul-internal.key.pem b/connect/testdata/ca1-ca-consul-internal.key.pem new file mode 100644 index 0000000000..8c40fd26bd --- /dev/null +++ b/connect/testdata/ca1-ca-consul-internal.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIDUDO3I7WKbLTTWkNKA4unB2RLq/RX+L+XIFssDE/AD7oAoGCCqGSM49 +AwEHoUQDQgAE9yD4gx7oCmBFWkqSMwY6lOZUK5oqwujdVx8jgIRdqbBhsXMXs6X/ +FYMuSL+l25nh63Lbcs1GLVCoaxQYcZD2EQ== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca1-svc-cache.cert.pem b/connect/testdata/ca1-svc-cache.cert.pem new file mode 100644 index 0000000000..097a2b6a6f --- /dev/null +++ b/connect/testdata/ca1-svc-cache.cert.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICEDCCAbagAwIBAgIBBTAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg +SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAQMQ4wDAYD +VQQDEwVjYWNoZTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABOWw8369v4DHJAI6 +k061hU8rxaQs87mZFQ52JfleJjRoDUuZIPLhZHMFbvbI8pDWi7YdjluNbzNNh6nu +fAivylujgfYwgfMwDgYDVR0PAQH/BAQDAgO4MB0GA1UdJQQWMBQGCCsGAQUFBwMC +BggrBgEFBQcDATAMBgNVHRMBAf8EAjAAMCkGA1UdDgQiBCCHhMqV2/R8meSsXtwh +OLC9hP7WQfuvwJ6V6uwKZdEofTArBgNVHSMEJDAigCCrnNQy2IQS73Co9WbrPXtq +/YP9SvIBOJ8iYRWTOxjCqTBcBgNVHREEVTBThlFzcGlmZmU6Ly8xMTExMTExMS0y +MjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsL25zL2RlZmF1bHQvZGMv +ZGMwMS9zdmMvY2FjaGUwCgYIKoZIzj0EAwIDSAAwRQIgPfekKBd/ltpVkdjnB0Hp +cV9HPwy12tXp4suR2nspSNkCIQD1Th/hvxuBKkRYy9Bl+jgTbrFdd4fLCWPeFbaM +sgLK7g== +-----END CERTIFICATE----- diff --git a/connect/testdata/ca1-svc-cache.key.pem b/connect/testdata/ca1-svc-cache.key.pem new file mode 100644 index 0000000000..f780f63db8 --- /dev/null +++ b/connect/testdata/ca1-svc-cache.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIPTSPV2cWNnO69f+vYyCg5frpoBtK6L+kZVLrGCv3TdnoAoGCCqGSM49 +AwEHoUQDQgAE5bDzfr2/gMckAjqTTrWFTyvFpCzzuZkVDnYl+V4mNGgNS5kg8uFk +cwVu9sjykNaLth2OW41vM02Hqe58CK/KWw== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca1-svc-db.cert.pem b/connect/testdata/ca1-svc-db.cert.pem new file mode 100644 index 0000000000..d00a38ea08 --- /dev/null +++ b/connect/testdata/ca1-svc-db.cert.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICCjCCAbCgAwIBAgIBBDAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg +SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjANMQswCQYD +VQQDEwJkYjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABEcTyr2l7yYWZuh++02M +usR20QrZtHdd7goKmYrIpQ3ekmHuLLgJWgTTaIhCj8fzbryep+s8oM7EiPhRQ14l +uSujgfMwgfAwDgYDVR0PAQH/BAQDAgO4MB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr +BgEFBQcDATAMBgNVHRMBAf8EAjAAMCkGA1UdDgQiBCAy6jHCBBT2bii+aMJCDJ33 +bFJtR72bxDBUi5b+YWyWwDArBgNVHSMEJDAigCCrnNQy2IQS73Co9WbrPXtq/YP9 +SvIBOJ8iYRWTOxjCqTBZBgNVHREEUjBQhk5zcGlmZmU6Ly8xMTExMTExMS0yMjIy +LTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsL25zL2RlZmF1bHQvZGMvZGMw +MS9zdmMvZGIwCgYIKoZIzj0EAwIDSAAwRQIhALCW4cOEpuYfLJ0NGwEmYG5Fko0N +WMccL0gEQzKUbIWrAiAIw8wkTSf1O8vTHeKdR1fCmdVoDRFRKB643PaofUzFxA== +-----END CERTIFICATE----- diff --git a/connect/testdata/ca1-svc-db.key.pem b/connect/testdata/ca1-svc-db.key.pem new file mode 100644 index 0000000000..3ec23a33b7 --- /dev/null +++ b/connect/testdata/ca1-svc-db.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIMHv1pjt75IjKXzl8l4rBtEFS1pEuOM4WNgeHg5Qn1RroAoGCCqGSM49 +AwEHoUQDQgAERxPKvaXvJhZm6H77TYy6xHbRCtm0d13uCgqZisilDd6SYe4suAla +BNNoiEKPx/NuvJ6n6zygzsSI+FFDXiW5Kw== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca1-svc-web.cert.pem b/connect/testdata/ca1-svc-web.cert.pem new file mode 100644 index 0000000000..a786a2c06a --- /dev/null +++ b/connect/testdata/ca1-svc-web.cert.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICDDCCAbKgAwIBAgIBAzAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg +SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAOMQwwCgYD +VQQDEwN3ZWIwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARF47lERGXziNBC74Kh +U3W29/M7JO9LIUaJgK0LJbhgf0MuPxf7gX+PnxH5ImI5yfXRv0SSxeCq7377IkXP +XS6Fo4H0MIHxMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcDAgYI +KwYBBQUHAwEwDAYDVR0TAQH/BAIwADApBgNVHQ4EIgQg26hfNYiVwYRm7CQJvdOd +NIOmG3t8vNwXCtktC782cf8wKwYDVR0jBCQwIoAgq5zUMtiEEu9wqPVm6z17av2D +/UryATifImEVkzsYwqkwWgYDVR0RBFMwUYZPc3BpZmZlOi8vMTExMTExMTEtMjIy +Mi0zMzMzLTQ0NDQtNTU1NTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2Rj +MDEvc3ZjL3dlYjAKBggqhkjOPQQDAgNIADBFAiAzi8uBs+ApPfAZZm5eO/hhVZiv +E8p84VKCqPeF3tFfoAIhANVkdSnp2AKU5T7SlJHmieu3DFNWCVpajlHJvf286J94 +-----END CERTIFICATE----- diff --git a/connect/testdata/ca1-svc-web.key.pem b/connect/testdata/ca1-svc-web.key.pem new file mode 100644 index 0000000000..8ed82c13c7 --- /dev/null +++ b/connect/testdata/ca1-svc-web.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIPOIj4BFS0fknG+uAVKZIWRpnzp7O3OKpBDgEmuml7lcoAoGCCqGSM49 +AwEHoUQDQgAEReO5RERl84jQQu+CoVN1tvfzOyTvSyFGiYCtCyW4YH9DLj8X+4F/ +j58R+SJiOcn10b9EksXgqu9++yJFz10uhQ== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca2-ca-vault.cert.pem b/connect/testdata/ca2-ca-vault.cert.pem new file mode 100644 index 0000000000..a7f6174686 --- /dev/null +++ b/connect/testdata/ca2-ca-vault.cert.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICDDCCAbKgAwIBAgIBAjAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe +Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMBAxDjAMBgNVBAMTBVZhdWx0 +MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEAjGVnRy/7Q2SU4ePbKbsurRAHKYA +CuA3r9QrowgZOr7yptF54shiobMIORpfKYkoYkhzL1lhWKI06BUJ4xuPd6OB/DCB ++TAOBgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zApBgNVHQ4EIgQgqEc5 +ZrELD5ySxapbU+eRb+aEv1MEoCvjC0mCA1uJecMwKwYDVR0jBCQwIoAgqEc5ZrEL +D5ySxapbU+eRb+aEv1MEoCvjC0mCA1uJecMwPwYDVR0RBDgwNoY0c3BpZmZlOi8v +MTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1NTU1NTU1NTU1LmNvbnN1bDA9BgNV +HR4BAf8EMzAxoC8wLYIrMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1NTU1NTU1 +NTU1LmNvbnN1bDAKBggqhkjOPQQDAgNIADBFAiEA6pBdeglhq//A7sYnYk85XL+3 +4IDrXrGN3KjC9qo3J9ICIDS9pEoTPWAWDfn1ccPafKVBrJm6KrmljcvymQ2QUDIZ +-----END CERTIFICATE----- +---- diff --git a/connect/testdata/ca2-ca-vault.key.pem b/connect/testdata/ca2-ca-vault.key.pem new file mode 100644 index 0000000000..43534b961b --- /dev/null +++ b/connect/testdata/ca2-ca-vault.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIKnuCctuvtyzf+M6B8jGqejG4T5o7NMRYjO2M3dZITCboAoGCCqGSM49 +AwEHoUQDQgAEAjGVnRy/7Q2SU4ePbKbsurRAHKYACuA3r9QrowgZOr7yptF54shi +obMIORpfKYkoYkhzL1lhWKI06BUJ4xuPdw== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca2-svc-cache.cert.pem b/connect/testdata/ca2-svc-cache.cert.pem new file mode 100644 index 0000000000..32110e232b --- /dev/null +++ b/connect/testdata/ca2-svc-cache.cert.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICBzCCAaygAwIBAgIBCDAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe +Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMBAxDjAMBgNVBAMTBWNhY2hl +MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyB6D+Eqi/71EhUrBWlcZOV2vjS9Y +xnUQ3jfH+QUZur7WOuGLnO7eArbAHcDbqKGyDWxlkZH04sGYOXaEW7UUd6OB9jCB +8zAOBgNVHQ8BAf8EBAMCA7gwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMB +MAwGA1UdEwEB/wQCMAAwKQYDVR0OBCIEIGapiHFxlbYbNKFlwdPMpKhIypvNZXo8 +k/OZLki/vurQMCsGA1UdIwQkMCKAIKhHOWaxCw+cksWqW1PnkW/mhL9TBKAr4wtJ +ggNbiXnDMFwGA1UdEQRVMFOGUXNwaWZmZTovLzExMTExMTExLTIyMjItMzMzMy00 +NDQ0LTU1NTU1NTU1NTU1NS5jb25zdWwvbnMvZGVmYXVsdC9kYy9kYzAxL3N2Yy9j +YWNoZTAKBggqhkjOPQQDAgNJADBGAiEA/vRLXbkigS6l89MxFk0RFE7Zo4vorv7s +E1juCOsVJBICIQDXlpmYH9fPon6DYMyOxQttNjkuWbJgnPv7rPg+CllRyA== +-----END CERTIFICATE----- diff --git a/connect/testdata/ca2-svc-cache.key.pem b/connect/testdata/ca2-svc-cache.key.pem new file mode 100644 index 0000000000..cabad8179d --- /dev/null +++ b/connect/testdata/ca2-svc-cache.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIEbQOv4odF2Tu8ZnJTJuytvOd2HOF9HxgGw5ei1pkP4moAoGCCqGSM49 +AwEHoUQDQgAEyB6D+Eqi/71EhUrBWlcZOV2vjS9YxnUQ3jfH+QUZur7WOuGLnO7e +ArbAHcDbqKGyDWxlkZH04sGYOXaEW7UUdw== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca2-svc-db.cert.pem b/connect/testdata/ca2-svc-db.cert.pem new file mode 100644 index 0000000000..33273058a4 --- /dev/null +++ b/connect/testdata/ca2-svc-db.cert.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICADCCAaagAwIBAgIBBzAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe +Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMA0xCzAJBgNVBAMTAmRiMFkw +EwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEFeB4DynO6IeKOE4zFLlBVFv+4HeWRvK3 +6cQ9L6v5uhLfdcYyqhT/QLbQ4R8ks1vUTTiq0XJsAGdkvkt71fiEl6OB8zCB8DAO +BgNVHQ8BAf8EBAMCA7gwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMAwG +A1UdEwEB/wQCMAAwKQYDVR0OBCIEIKjVz8n91cej8q6WpDNd0hwSMAE2ddY056PH +hMfaBM6GMCsGA1UdIwQkMCKAIKhHOWaxCw+cksWqW1PnkW/mhL9TBKAr4wtJggNb +iXnDMFkGA1UdEQRSMFCGTnNwaWZmZTovLzExMTExMTExLTIyMjItMzMzMy00NDQ0 +LTU1NTU1NTU1NTU1NS5jb25zdWwvbnMvZGVmYXVsdC9kYy9kYzAxL3N2Yy9kYjAK +BggqhkjOPQQDAgNIADBFAiAdYkokbeZr7W32NhjcNoTMNwpz9CqJpK6Yzu4N6EJc +pAIhALHpRM57zdiMouDOlhGPX5XKzbSl2AnBjFvbPqgFV09Z +-----END CERTIFICATE----- diff --git a/connect/testdata/ca2-svc-db.key.pem b/connect/testdata/ca2-svc-db.key.pem new file mode 100644 index 0000000000..7f7ab9ff81 --- /dev/null +++ b/connect/testdata/ca2-svc-db.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIHnzia+DNTFB7uYQEuWvLR2czGCuDfOTt1FfcBo1uBJioAoGCCqGSM49 +AwEHoUQDQgAEFeB4DynO6IeKOE4zFLlBVFv+4HeWRvK36cQ9L6v5uhLfdcYyqhT/ +QLbQ4R8ks1vUTTiq0XJsAGdkvkt71fiElw== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca2-svc-web.cert.pem b/connect/testdata/ca2-svc-web.cert.pem new file mode 100644 index 0000000000..ae1e338f66 --- /dev/null +++ b/connect/testdata/ca2-svc-web.cert.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICAjCCAaigAwIBAgIBBjAKBggqhkjOPQQDAjAQMQ4wDAYDVQQDEwVWYXVsdDAe +Fw0xODAzMjMyMjA0MjVaFw0yODAzMjAyMjA0MjVaMA4xDDAKBgNVBAMTA3dlYjBZ +MBMGByqGSM49AgEGCCqGSM49AwEHA0IABM9XzxWFCa80uQDfJEGboUC15Yr+FwDp +OemThalQxFpkL7gQSIgpzgGULIx+jCiu+clJ0QhbWT2dnS8vFUKq35qjgfQwgfEw +DgYDVR0PAQH/BAQDAgO4MB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATAM +BgNVHRMBAf8EAjAAMCkGA1UdDgQiBCCN+TKHPCOr48hxRCx4rqbWQg5QHkCSNzjZ +qi1JGs13njArBgNVHSMEJDAigCCoRzlmsQsPnJLFqltT55Fv5oS/UwSgK+MLSYID +W4l5wzBaBgNVHREEUzBRhk9zcGlmZmU6Ly8xMTExMTExMS0yMjIyLTMzMzMtNDQ0 +NC01NTU1NTU1NTU1NTUuY29uc3VsL25zL2RlZmF1bHQvZGMvZGMwMS9zdmMvd2Vi +MAoGCCqGSM49BAMCA0gAMEUCIBd6gpL6E8rms5BU+cJeeyv0Rjc18edn2g3q2wLN +r1zAAiEAv16whKwR0DeKkldGLDQIu9nCNvfDZrEWgywIBYbzLxY= +-----END CERTIFICATE----- diff --git a/connect/testdata/ca2-svc-web.key.pem b/connect/testdata/ca2-svc-web.key.pem new file mode 100644 index 0000000000..65f0bc48e6 --- /dev/null +++ b/connect/testdata/ca2-svc-web.key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIOCMjjRexX3qHjixpRwLxggJd9yuskqUoPy8/MepafP+oAoGCCqGSM49 +AwEHoUQDQgAEz1fPFYUJrzS5AN8kQZuhQLXliv4XAOk56ZOFqVDEWmQvuBBIiCnO +AZQsjH6MKK75yUnRCFtZPZ2dLy8VQqrfmg== +-----END EC PRIVATE KEY----- diff --git a/connect/testdata/ca2-xc-by-ca1.cert.pem b/connect/testdata/ca2-xc-by-ca1.cert.pem new file mode 100644 index 0000000000..e864f6c00f --- /dev/null +++ b/connect/testdata/ca2-xc-by-ca1.cert.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICFjCCAbygAwIBAgIBAjAKBggqhkjOPQQDAjAaMRgwFgYDVQQDEw9Db25zdWwg +SW50ZXJuYWwwHhcNMTgwMzIzMjIwNDI1WhcNMjgwMzIwMjIwNDI1WjAQMQ4wDAYD +VQQDEwVWYXVsdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABAIxlZ0cv+0NklOH +j2ym7Lq0QBymAArgN6/UK6MIGTq+8qbReeLIYqGzCDkaXymJKGJIcy9ZYViiNOgV +CeMbj3ejgfwwgfkwDgYDVR0PAQH/BAQDAgGGMA8GA1UdEwEB/wQFMAMBAf8wKQYD +VR0OBCIEIKhHOWaxCw+cksWqW1PnkW/mhL9TBKAr4wtJggNbiXnDMCsGA1UdIwQk +MCKAIKuc1DLYhBLvcKj1Zus9e2r9g/1K8gE4nyJhFZM7GMKpMD8GA1UdEQQ4MDaG +NHNwaWZmZTovLzExMTExMTExLTIyMjItMzMzMy00NDQ0LTU1NTU1NTU1NTU1NS5j +b25zdWwwPQYDVR0eAQH/BDMwMaAvMC2CKzExMTExMTExLTIyMjItMzMzMy00NDQ0 +LTU1NTU1NTU1NTU1NS5jb25zdWwwCgYIKoZIzj0EAwIDSAAwRQIgWWWj8/6SaY2y +wzOtIphwZLewCuLMG6KG8uY4S7UsosgCIQDhCbT/LUKq/A21khQncBmM79ng9Gbx +/4Zw8zbVmnZJKg== +-----END CERTIFICATE----- diff --git a/connect/testdata/mkcerts.go b/connect/testdata/mkcerts.go new file mode 100644 index 0000000000..7fe82f53a6 --- /dev/null +++ b/connect/testdata/mkcerts.go @@ -0,0 +1,243 @@ +package main + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/sha256" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "log" + "math/big" + "net/url" + "os" + "regexp" + "strings" + "time" +) + +// You can verify a given leaf with a given root using: +// +// $ openssl verify -verbose -CAfile ca2-ca-vault.cert.pem ca1-svc-db.cert.pem +// +// Note that to verify via the cross-signed intermediate, openssl requires it to +// be bundled with the _root_ CA bundle and will ignore the cert if it's passed +// with the subject. You can do that with: +// +// $ openssl verify -verbose -CAfile \ +// <(cat ca1-ca-consul-internal.cert.pem ca2-xc-by-ca1.cert.pem) \ +// ca2-svc-db.cert.pem +// ca2-svc-db.cert.pem: OK +// +// Note that the same leaf and root without the intermediate should fail: +// +// $ openssl verify -verbose -CAfile ca1-ca-consul-internal.cert.pem ca2-svc-db.cert.pem +// ca2-svc-db.cert.pem: CN = db +// error 20 at 0 depth lookup:unable to get local issuer certificate +// +// NOTE: THIS IS A QUIRK OF OPENSSL; in Connect we will distribute the roots +// alone and stable intermediates like the XC cert to the _leaf_. + +var clusterID = "11111111-2222-3333-4444-555555555555" +var cAs = []string{"Consul Internal", "Vault"} +var services = []string{"web", "db", "cache"} +var slugRe = regexp.MustCompile("[^a-zA-Z0-9]+") +var serial int64 + +type caInfo struct { + id int + name string + slug string + uri *url.URL + pk *ecdsa.PrivateKey + cert *x509.Certificate +} + +func main() { + // Make CA certs + caInfos := make(map[string]caInfo) + var previousCA *caInfo + for idx, name := range cAs { + ca := caInfo{ + id: idx + 1, + name: name, + slug: strings.ToLower(slugRe.ReplaceAllString(name, "-")), + } + pk, err := makePK(fmt.Sprintf("ca%d-ca-%s.key.pem", ca.id, ca.slug)) + if err != nil { + log.Fatal(err) + } + ca.pk = pk + caURI, err := url.Parse(fmt.Sprintf("spiffe://%s.consul", clusterID)) + if err != nil { + log.Fatal(err) + } + ca.uri = caURI + cert, err := makeCACert(ca, previousCA) + if err != nil { + log.Fatal(err) + } + ca.cert = cert + caInfos[name] = ca + previousCA = &ca + } + + // For each CA, make a leaf cert for each service + for _, ca := range caInfos { + for _, svc := range services { + _, err := makeLeafCert(ca, svc) + if err != nil { + log.Fatal(err) + } + } + } +} + +func makePK(path string) (*ecdsa.PrivateKey, error) { + log.Printf("Writing PK file: %s", path) + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return nil, err + } + + bs, err := x509.MarshalECPrivateKey(priv) + if err != nil { + return nil, err + } + + err = writePEM(path, "EC PRIVATE KEY", bs) + return priv, nil +} + +func makeCACert(ca caInfo, previousCA *caInfo) (*x509.Certificate, error) { + path := fmt.Sprintf("ca%d-ca-%s.cert.pem", ca.id, ca.slug) + log.Printf("Writing CA cert file: %s", path) + serial++ + subj := pkix.Name{ + CommonName: ca.name, + } + template := x509.Certificate{ + SerialNumber: big.NewInt(serial), + Subject: subj, + // New in go 1.10 + URIs: []*url.URL{ca.uri}, + // Add DNS name constraint + PermittedDNSDomainsCritical: true, + PermittedDNSDomains: []string{ca.uri.Hostname()}, + SignatureAlgorithm: x509.ECDSAWithSHA256, + BasicConstraintsValid: true, + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature, + IsCA: true, + NotAfter: time.Now().Add(10 * 365 * 24 * time.Hour), + NotBefore: time.Now(), + AuthorityKeyId: keyID(&ca.pk.PublicKey), + SubjectKeyId: keyID(&ca.pk.PublicKey), + } + bs, err := x509.CreateCertificate(rand.Reader, &template, &template, + &ca.pk.PublicKey, ca.pk) + if err != nil { + return nil, err + } + + err = writePEM(path, "CERTIFICATE", bs) + if err != nil { + return nil, err + } + + cert, err := x509.ParseCertificate(bs) + if err != nil { + return nil, err + } + + if previousCA != nil { + // Also create cross-signed cert as we would use during rotation between + // previous CA and this one. + template.AuthorityKeyId = keyID(&previousCA.pk.PublicKey) + bs, err := x509.CreateCertificate(rand.Reader, &template, + previousCA.cert, &ca.pk.PublicKey, previousCA.pk) + if err != nil { + return nil, err + } + + path := fmt.Sprintf("ca%d-xc-by-ca%d.cert.pem", ca.id, previousCA.id) + err = writePEM(path, "CERTIFICATE", bs) + if err != nil { + return nil, err + } + } + + return cert, err +} + +func keyID(pub *ecdsa.PublicKey) []byte { + // This is not standard; RFC allows any unique identifier as long as they + // match in subject/authority chains but suggests specific hashing of DER + // bytes of public key including DER tags. I can't be bothered to do esp. + // since ECDSA keys don't have a handy way to marshal the publick key alone. + h := sha256.New() + h.Write(pub.X.Bytes()) + h.Write(pub.Y.Bytes()) + return h.Sum([]byte{}) +} + +func makeLeafCert(ca caInfo, svc string) (*x509.Certificate, error) { + svcURI := ca.uri + svcURI.Path = "/ns/default/dc/dc01/svc/" + svc + + keyPath := fmt.Sprintf("ca%d-svc-%s.key.pem", ca.id, svc) + cPath := fmt.Sprintf("ca%d-svc-%s.cert.pem", ca.id, svc) + + pk, err := makePK(keyPath) + if err != nil { + return nil, err + } + + log.Printf("Writing Service Cert: %s", cPath) + + serial++ + subj := pkix.Name{ + CommonName: svc, + } + template := x509.Certificate{ + SerialNumber: big.NewInt(serial), + Subject: subj, + // New in go 1.10 + URIs: []*url.URL{svcURI}, + SignatureAlgorithm: x509.ECDSAWithSHA256, + BasicConstraintsValid: true, + KeyUsage: x509.KeyUsageDataEncipherment | + x509.KeyUsageKeyAgreement | x509.KeyUsageDigitalSignature | + x509.KeyUsageKeyEncipherment, + ExtKeyUsage: []x509.ExtKeyUsage{ + x509.ExtKeyUsageClientAuth, + x509.ExtKeyUsageServerAuth, + }, + NotAfter: time.Now().Add(10 * 365 * 24 * time.Hour), + NotBefore: time.Now(), + AuthorityKeyId: keyID(&ca.pk.PublicKey), + SubjectKeyId: keyID(&pk.PublicKey), + } + bs, err := x509.CreateCertificate(rand.Reader, &template, ca.cert, + &pk.PublicKey, ca.pk) + if err != nil { + return nil, err + } + + err = writePEM(cPath, "CERTIFICATE", bs) + if err != nil { + return nil, err + } + + return x509.ParseCertificate(bs) +} + +func writePEM(name, typ string, bs []byte) error { + f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return err + } + defer f.Close() + return pem.Encode(f, &pem.Block{Type: typ, Bytes: bs}) +} diff --git a/connect/testing.go b/connect/testing.go new file mode 100644 index 0000000000..90db332a2a --- /dev/null +++ b/connect/testing.go @@ -0,0 +1,88 @@ +package connect + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "path" + "path/filepath" + "runtime" + + "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/require" +) + +// testDataDir is a janky temporary hack to allow use of these methods from +// proxy package. We need to revisit where all this lives since it logically +// overlaps with consul/agent in Mitchell's PR and that one generates certs on +// the fly which will make this unecessary but I want to get things working for +// now with what I've got :). This wonderful heap kinda-sorta gets the path +// relative to _this_ file so it works even if the Test* method is being called +// from a test binary in another package dir. +func testDataDir() string { + _, filename, _, ok := runtime.Caller(0) + if !ok { + panic("no caller information") + } + return path.Dir(filename) + "/testdata" +} + +// TestCAPool returns an *x509.CertPool containing the named CA certs from the +// testdata dir. +func TestCAPool(t testing.T, caNames ...string) *x509.CertPool { + t.Helper() + pool := x509.NewCertPool() + for _, name := range caNames { + certs, err := filepath.Glob(testDataDir() + "/" + name + "-ca-*.cert.pem") + require.Nil(t, err) + for _, cert := range certs { + caPem, err := ioutil.ReadFile(cert) + require.Nil(t, err) + pool.AppendCertsFromPEM(caPem) + } + } + return pool +} + +// TestSvcKeyPair returns an tls.Certificate containing both cert and private +// key for a given service under a given CA from the testdata dir. +func TestSvcKeyPair(t testing.T, ca, name string) tls.Certificate { + t.Helper() + prefix := fmt.Sprintf(testDataDir()+"/%s-svc-%s", ca, name) + cert, err := tls.LoadX509KeyPair(prefix+".cert.pem", prefix+".key.pem") + require.Nil(t, err) + return cert +} + +// TestTLSConfig returns a *tls.Config suitable for use during tests. +func TestTLSConfig(t testing.T, ca, svc string) *tls.Config { + t.Helper() + return &tls.Config{ + Certificates: []tls.Certificate{TestSvcKeyPair(t, ca, svc)}, + MinVersion: tls.VersionTLS12, + RootCAs: TestCAPool(t, ca), + ClientCAs: TestCAPool(t, ca), + ClientAuth: tls.RequireAndVerifyClientCert, + // In real life we'll need to do this too since otherwise Go will attempt to + // verify DNS names match DNS SAN/CN which we don't want, but we'll hook + // VerifyPeerCertificates and do our own x509 path validation as well as + // AuthZ upcall. For now we are just testing the basic proxy mechanism so + // this is fine. + InsecureSkipVerify: true, + } +} + +// TestAuther is a simple Auther implementation that does nothing but what you +// tell it to! +type TestAuther struct { + // Return is the value returned from an Auth() call. Set it to nil to have all + // certificates unconditionally accepted or a value to have them fail. + Return error +} + +// Auth implements Auther +func (a *TestAuther) Auth(rawCerts [][]byte, + verifiedChains [][]*x509.Certificate) error { + return a.Return +} diff --git a/connect/tls.go b/connect/tls.go new file mode 100644 index 0000000000..af66d9c0c3 --- /dev/null +++ b/connect/tls.go @@ -0,0 +1,124 @@ +package connect + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "sync" +) + +// defaultTLSConfig returns the standard config for connect clients and servers. +func defaultTLSConfig() *tls.Config { + serverAuther := &ServerAuther{} + return &tls.Config{ + MinVersion: tls.VersionTLS12, + ClientAuth: tls.RequireAndVerifyClientCert, + // We don't have access to go internals that decide if AES hardware + // acceleration is available in order to prefer CHA CHA if not. So let's + // just always prefer AES for now. We can look into doing something uglier + // later like using an external lib for AES checking if it seems important. + // https://github.com/golang/go/blob/df91b8044dbe790c69c16058330f545be069cc1f/src/crypto/tls/common.go#L919:14 + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + }, + // We have to set this since otherwise Go will attempt to verify DNS names + // match DNS SAN/CN which we don't want. We hook up VerifyPeerCertificate to + // do our own path validation as well as Connect AuthZ. + InsecureSkipVerify: true, + // By default auth as if we are a server. Clients need to override this with + // an Auther that is performs correct validation of the server identity they + // intended to connect to. + VerifyPeerCertificate: serverAuther.Auth, + } +} + +// ReloadableTLSConfig exposes a tls.Config that can have it's certificates +// reloaded. This works by +type ReloadableTLSConfig struct { + mu sync.Mutex + + // cfg is the current config to use for new connections + cfg *tls.Config +} + +// NewReloadableTLSConfig returns a reloadable config currently set to base. The +// Auther used to verify certificates for incoming connections on a Server will +// just be copied from the VerifyPeerCertificate passed. Clients will need to +// pass a specific Auther instance when they call TLSConfig that is configured +// to perform the necessary validation of the server's identity. +func NewReloadableTLSConfig(base *tls.Config) *ReloadableTLSConfig { + return &ReloadableTLSConfig{cfg: base} +} + +// ServerTLSConfig returns a *tls.Config that will dynamically load certs for +// each inbound connection via the GetConfigForClient callback. +func (c *ReloadableTLSConfig) ServerTLSConfig() *tls.Config { + // Setup the basic one with current params even though we will be using + // different config for each new conn. + c.mu.Lock() + base := c.cfg + c.mu.Unlock() + + // Dynamically fetch the current config for each new inbound connection + base.GetConfigForClient = func(info *tls.ClientHelloInfo) (*tls.Config, error) { + return c.TLSConfig(nil), nil + } + + return base +} + +// TLSConfig returns the current value for the config. It is safe to call from +// any goroutine. The passed Auther is inserted into the config's +// VerifyPeerCertificate. Passing a nil Auther will leave the default one in the +// base config +func (c *ReloadableTLSConfig) TLSConfig(auther Auther) *tls.Config { + c.mu.Lock() + cfgCopy := c.cfg + c.mu.Unlock() + if auther != nil { + cfgCopy.VerifyPeerCertificate = auther.Auth + } + return cfgCopy +} + +// SetTLSConfig sets the config used for future connections. It is safe to call +// from any goroutine. +func (c *ReloadableTLSConfig) SetTLSConfig(cfg *tls.Config) error { + c.mu.Lock() + defer c.mu.Unlock() + c.cfg = cfg + return nil +} + +// devTLSConfigFromFiles returns a default TLS Config but with certs and CAs +// based on local files for dev. +func devTLSConfigFromFiles(caFile, certFile, + keyFile string) (*tls.Config, error) { + + roots := x509.NewCertPool() + + bs, err := ioutil.ReadFile(caFile) + if err != nil { + return nil, err + } + + roots.AppendCertsFromPEM(bs) + + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + + cfg := defaultTLSConfig() + + cfg.Certificates = []tls.Certificate{cert} + cfg.RootCAs = roots + cfg.ClientCAs = roots + + return cfg, nil +} diff --git a/connect/tls_test.go b/connect/tls_test.go new file mode 100644 index 0000000000..0c99df3adc --- /dev/null +++ b/connect/tls_test.go @@ -0,0 +1,45 @@ +package connect + +import ( + "crypto/tls" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReloadableTLSConfig(t *testing.T) { + base := TestTLSConfig(t, "ca1", "web") + + c := NewReloadableTLSConfig(base) + + a := &TestAuther{ + Return: nil, + } + + // The dynamic config should be the one we loaded, but with the passed auther + expect := base + expect.VerifyPeerCertificate = a.Auth + require.Equal(t, base, c.TLSConfig(a)) + + // The server config should return same too for new connections + serverCfg := c.ServerTLSConfig() + require.NotNil(t, serverCfg.GetConfigForClient) + got, err := serverCfg.GetConfigForClient(&tls.ClientHelloInfo{}) + require.Nil(t, err) + require.Equal(t, base, got) + + // Now change the config as if we just rotated to a new CA + new := TestTLSConfig(t, "ca2", "web") + err = c.SetTLSConfig(new) + require.Nil(t, err) + + // The dynamic config should be the one we loaded (with same auther due to nil) + require.Equal(t, new, c.TLSConfig(nil)) + + // The server config should return same too for new connections + serverCfg = c.ServerTLSConfig() + require.NotNil(t, serverCfg.GetConfigForClient) + got, err = serverCfg.GetConfigForClient(&tls.ClientHelloInfo{}) + require.Nil(t, err) + require.Equal(t, new, got) +} diff --git a/proxy/config.go b/proxy/config.go new file mode 100644 index 0000000000..a5958135ac --- /dev/null +++ b/proxy/config.go @@ -0,0 +1,111 @@ +package proxy + +import ( + "io/ioutil" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/hcl" +) + +// Config is the publicly configurable state for an entire proxy instance. It's +// mostly used as the format for the local-file config mode which is mostly for +// dev/testing. In normal use, different parts of this config are pulled from +// different locations (e.g. command line, agent config endpoint, agent +// certificate endpoints). +type Config struct { + // ProxyID is the identifier for this proxy as registered in Consul. It's only + // guaranteed to be unique per agent. + ProxyID string `json:"proxy_id" hcl:"proxy_id"` + + // Token is the authentication token provided for queries to the local agent. + Token string `json:"token" hcl:"token"` + + // ProxiedServiceName is the name of the service this proxy is representing. + ProxiedServiceName string `json:"proxied_service_name" hcl:"proxied_service_name"` + + // ProxiedServiceNamespace is the namespace of the service this proxy is + // representing. + ProxiedServiceNamespace string `json:"proxied_service_namespace" hcl:"proxied_service_namespace"` + + // PublicListener configures the mTLS listener. + PublicListener PublicListenerConfig `json:"public_listener" hcl:"public_listener"` + + // Upstreams configures outgoing proxies for remote connect services. + Upstreams []UpstreamConfig `json:"upstreams" hcl:"upstreams"` + + // DevCAFile allows passing the file path to PEM encoded root certificate + // bundle to be used in development instead of the ones supplied by Connect. + DevCAFile string `json:"dev_ca_file" hcl:"dev_ca_file"` + + // DevServiceCertFile allows passing the file path to PEM encoded service + // certificate (client and server) to be used in development instead of the + // ones supplied by Connect. + DevServiceCertFile string `json:"dev_service_cert_file" hcl:"dev_service_cert_file"` + + // DevServiceKeyFile allows passing the file path to PEM encoded service + // private key to be used in development instead of the ones supplied by + // Connect. + DevServiceKeyFile string `json:"dev_service_key_file" hcl:"dev_service_key_file"` +} + +// ConfigWatcher is a simple interface to allow dynamic configurations from +// plugggable sources. +type ConfigWatcher interface { + // Watch returns a channel that will deliver new Configs if something external + // provokes it. + Watch() <-chan *Config +} + +// StaticConfigWatcher is a simple ConfigWatcher that delivers a static Config +// once and then never changes it. +type StaticConfigWatcher struct { + ch chan *Config +} + +// NewStaticConfigWatcher returns a ConfigWatcher for a config that never +// changes. It assumes only one "watcher" will ever call Watch. The config is +// delivered on the first call but will never be delivered again to allow +// callers to call repeatedly (e.g. select in a loop). +func NewStaticConfigWatcher(cfg *Config) *StaticConfigWatcher { + sc := &StaticConfigWatcher{ + // Buffer it so we can queue up the config for first delivery. + ch: make(chan *Config, 1), + } + sc.ch <- cfg + return sc +} + +// Watch implements ConfigWatcher on a static configuration for compatibility. +// It returns itself on the channel once and then leaves it open. +func (sc *StaticConfigWatcher) Watch() <-chan *Config { + return sc.ch +} + +// ParseConfigFile parses proxy configuration form a file for local dev. +func ParseConfigFile(filename string) (*Config, error) { + bs, err := ioutil.ReadFile(filename) + if err != nil { + return nil, err + } + + var cfg Config + + err = hcl.Unmarshal(bs, &cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} + +// AgentConfigWatcher watches the local Consul agent for proxy config changes. +type AgentConfigWatcher struct { + client *api.Client +} + +// Watch implements ConfigWatcher. +func (w *AgentConfigWatcher) Watch() <-chan *Config { + watch := make(chan *Config) + // TODO implement me + return watch +} diff --git a/proxy/config_test.go b/proxy/config_test.go new file mode 100644 index 0000000000..89287d573e --- /dev/null +++ b/proxy/config_test.go @@ -0,0 +1,46 @@ +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseConfigFile(t *testing.T) { + cfg, err := ParseConfigFile("testdata/config-kitchensink.hcl") + require.Nil(t, err) + + expect := &Config{ + ProxyID: "foo", + Token: "11111111-2222-3333-4444-555555555555", + ProxiedServiceName: "web", + ProxiedServiceNamespace: "default", + PublicListener: PublicListenerConfig{ + BindAddress: ":9999", + LocalServiceAddress: "127.0.0.1:5000", + LocalConnectTimeoutMs: 1000, + HandshakeTimeoutMs: 5000, + }, + Upstreams: []UpstreamConfig{ + { + LocalBindAddress: "127.0.0.1:6000", + DestinationName: "db", + DestinationNamespace: "default", + DestinationType: "service", + ConnectTimeoutMs: 10000, + }, + { + LocalBindAddress: "127.0.0.1:6001", + DestinationName: "geo-cache", + DestinationNamespace: "default", + DestinationType: "prepared_query", + ConnectTimeoutMs: 10000, + }, + }, + DevCAFile: "connect/testdata/ca1-ca-consul-internal.cert.pem", + DevServiceCertFile: "connect/testdata/ca1-svc-web.cert.pem", + DevServiceKeyFile: "connect/testdata/ca1-svc-web.key.pem", + } + + require.Equal(t, expect, cfg) +} diff --git a/proxy/conn.go b/proxy/conn.go new file mode 100644 index 0000000000..dfad81db76 --- /dev/null +++ b/proxy/conn.go @@ -0,0 +1,48 @@ +package proxy + +import ( + "io" + "net" + "sync/atomic" +) + +// Conn represents a single proxied TCP connection. +type Conn struct { + src, dst net.Conn + stopping int32 +} + +// NewConn returns a conn joining the two given net.Conn +func NewConn(src, dst net.Conn) *Conn { + return &Conn{ + src: src, + dst: dst, + stopping: 0, + } +} + +// Close closes both connections. +func (c *Conn) Close() { + atomic.StoreInt32(&c.stopping, 1) + c.src.Close() + c.dst.Close() +} + +// CopyBytes will continuously copy bytes in both directions between src and dst +// until either connection is closed. +func (c *Conn) CopyBytes() error { + defer c.Close() + + go func() { + // Need this since Copy is only guaranteed to stop when it's source reader + // (second arg) hits EOF or error but either conn might close first possibly + // causing this goroutine to exit but not the outer one. See TestSc + //defer c.Close() + io.Copy(c.dst, c.src) + }() + _, err := io.Copy(c.src, c.dst) + if atomic.LoadInt32(&c.stopping) == 1 { + return nil + } + return err +} diff --git a/proxy/conn_test.go b/proxy/conn_test.go new file mode 100644 index 0000000000..ac907238d0 --- /dev/null +++ b/proxy/conn_test.go @@ -0,0 +1,119 @@ +package proxy + +import ( + "bufio" + "net" + "testing" + + "github.com/stretchr/testify/require" +) + +// testConnSetup listens on a random TCP port and passes the accepted net.Conn +// back to test code on returned channel. It then creates a source and +// destination Conn. And a cleanup func +func testConnSetup(t *testing.T) (net.Conn, net.Conn, func()) { + t.Helper() + + l, err := net.Listen("tcp", "localhost:0") + require.Nil(t, err) + + ch := make(chan net.Conn, 1) + go func(ch chan net.Conn) { + src, err := l.Accept() + require.Nil(t, err) + ch <- src + }(ch) + + dst, err := net.Dial("tcp", l.Addr().String()) + require.Nil(t, err) + + src := <-ch + + stopper := func() { + l.Close() + src.Close() + dst.Close() + } + + return src, dst, stopper +} + +func TestConn(t *testing.T) { + src, dst, stop := testConnSetup(t) + defer stop() + + c := NewConn(src, dst) + + retCh := make(chan error, 1) + go func() { + retCh <- c.CopyBytes() + }() + + srcR := bufio.NewReader(src) + dstR := bufio.NewReader(dst) + + _, err := src.Write([]byte("ping 1\n")) + require.Nil(t, err) + _, err = dst.Write([]byte("ping 2\n")) + require.Nil(t, err) + + got, err := dstR.ReadString('\n') + require.Equal(t, "ping 1\n", got) + + got, err = srcR.ReadString('\n') + require.Equal(t, "ping 2\n", got) + + _, err = src.Write([]byte("pong 1\n")) + require.Nil(t, err) + _, err = dst.Write([]byte("pong 2\n")) + require.Nil(t, err) + + got, err = dstR.ReadString('\n') + require.Equal(t, "pong 1\n", got) + + got, err = srcR.ReadString('\n') + require.Equal(t, "pong 2\n", got) + + c.Close() + + ret := <-retCh + require.Nil(t, ret, "Close() should not cause error return") +} + +func TestConnSrcClosing(t *testing.T) { + src, dst, stop := testConnSetup(t) + defer stop() + + c := NewConn(src, dst) + retCh := make(chan error, 1) + go func() { + retCh <- c.CopyBytes() + }() + + // If we close the src conn, we expect CopyBytes to return and src to be + // closed too. No good way to assert that the conn is closed really other than + // assume the retCh receive will hand unless CopyBytes exits and that + // CopyBytes defers Closing both. i.e. if this test doesn't time out it's + // good! + src.Close() + <-retCh +} + +func TestConnDstClosing(t *testing.T) { + src, dst, stop := testConnSetup(t) + defer stop() + + c := NewConn(src, dst) + retCh := make(chan error, 1) + go func() { + retCh <- c.CopyBytes() + }() + + // If we close the dst conn, we expect CopyBytes to return and src to be + // closed too. No good way to assert that the conn is closed really other than + // assume the retCh receive will hand unless CopyBytes exits and that + // CopyBytes defers Closing both. i.e. if this test doesn't time out it's + // good! + dst.Close() + <-retCh +} diff --git a/proxy/manager.go b/proxy/manager.go new file mode 100644 index 0000000000..c22a1b7ff1 --- /dev/null +++ b/proxy/manager.go @@ -0,0 +1,140 @@ +package proxy + +import ( + "errors" + "log" + "os" +) + +var ( + // ErrExists is the error returned when adding a proxy that exists already. + ErrExists = errors.New("proxy with that name already exists") + // ErrNotExist is the error returned when removing a proxy that doesn't exist. + ErrNotExist = errors.New("proxy with that name doesn't exist") +) + +// Manager implements the logic for configuring and running a set of proxiers. +// Typically it's used to run one PublicListener and zero or more Upstreams. +type Manager struct { + ch chan managerCmd + + // stopped is used to signal the caller of StopAll when the run loop exits + // after stopping all runners. It's only closed. + stopped chan struct{} + + // runners holds the currently running instances. It should only by accessed + // from within the `run` goroutine. + runners map[string]*Runner + + logger *log.Logger +} + +type managerCmd struct { + name string + p Proxier + errCh chan error +} + +// NewManager creates a manager of proxier instances. +func NewManager() *Manager { + return NewManagerWithLogger(log.New(os.Stdout, "", log.LstdFlags)) +} + +// NewManagerWithLogger creates a manager of proxier instances with the +// specified logger. +func NewManagerWithLogger(logger *log.Logger) *Manager { + m := &Manager{ + ch: make(chan managerCmd), + stopped: make(chan struct{}), + runners: make(map[string]*Runner), + logger: logger, + } + go m.run() + return m +} + +// RunProxier starts a new Proxier instance in the manager. It is safe to call +// from separate goroutines. If there is already a running proxy with the same +// name it returns ErrExists. +func (m *Manager) RunProxier(name string, p Proxier) error { + cmd := managerCmd{ + name: name, + p: p, + errCh: make(chan error), + } + m.ch <- cmd + return <-cmd.errCh +} + +// StopProxier stops a Proxier instance by name. It is safe to call from +// separate goroutines. If the instance with that name doesn't exist it returns +// ErrNotExist. +func (m *Manager) StopProxier(name string) error { + cmd := managerCmd{ + name: name, + p: nil, + errCh: make(chan error), + } + m.ch <- cmd + return <-cmd.errCh +} + +// StopAll shuts down the manager instance and stops all running proxies. It is +// safe to call from any goroutine but must only be called once. +func (m *Manager) StopAll() error { + close(m.ch) + <-m.stopped + return nil +} + +// run is the main manager processing loop. It keeps all actions in a single +// goroutine triggered by channel commands to keep it simple to reason about +// lifecycle events for each proxy. +func (m *Manager) run() { + defer close(m.stopped) + + // range over channel blocks and loops on each message received until channel + // is closed. + for cmd := range m.ch { + if cmd.p == nil { + m.remove(&cmd) + } else { + m.add(&cmd) + } + } + + // Shutting down, Stop all the runners + for _, r := range m.runners { + r.Stop() + } +} + +// add the named proxier instance and stop it. Should only be called from the +// run loop. +func (m *Manager) add(cmd *managerCmd) { + // Check existing + if _, ok := m.runners[cmd.name]; ok { + cmd.errCh <- ErrExists + return + } + + // Start new runner + r := NewRunnerWithLogger(cmd.name, cmd.p, m.logger) + m.runners[cmd.name] = r + go r.Listen() + cmd.errCh <- nil +} + +// remove the named proxier instance and stop it. Should only be called from the +// run loop. +func (m *Manager) remove(cmd *managerCmd) { + // Fetch proxier by name + r, ok := m.runners[cmd.name] + if !ok { + cmd.errCh <- ErrNotExist + return + } + err := r.Stop() + delete(m.runners, cmd.name) + cmd.errCh <- err +} diff --git a/proxy/manager_test.go b/proxy/manager_test.go new file mode 100644 index 0000000000..d4fa8c5b48 --- /dev/null +++ b/proxy/manager_test.go @@ -0,0 +1,76 @@ +package proxy + +import ( + "fmt" + "net" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestManager(t *testing.T) { + m := NewManager() + + addrs := TestLocalBindAddrs(t, 3) + + for i := 0; i < len(addrs); i++ { + name := fmt.Sprintf("proxier-%d", i) + // Run proxy + err := m.RunProxier(name, &TestProxier{ + Addr: addrs[i], + Prefix: name + ": ", + }) + require.Nil(t, err) + } + + // Make sure each one is echoing correctly now all are running + for i := 0; i < len(addrs); i++ { + conn, err := net.Dial("tcp", addrs[i]) + require.Nil(t, err) + TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i)) + conn.Close() + } + + // Stop first proxier + err := m.StopProxier("proxier-0") + require.Nil(t, err) + + // We should fail to dial it now. Note that Runner.Stop is synchronous so + // there should be a strong guarantee that it's stopped listening by now. + _, err = net.Dial("tcp", addrs[0]) + require.NotNil(t, err) + + // Rest of proxiers should still be running + for i := 1; i < len(addrs); i++ { + conn, err := net.Dial("tcp", addrs[i]) + require.Nil(t, err) + TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i)) + conn.Close() + } + + // Stop non-existent proxier should fail + err = m.StopProxier("foo") + require.Equal(t, ErrNotExist, err) + + // Add already-running proxier should fail + err = m.RunProxier("proxier-1", &TestProxier{}) + require.Equal(t, ErrExists, err) + + // But rest should stay running + for i := 1; i < len(addrs); i++ { + conn, err := net.Dial("tcp", addrs[i]) + require.Nil(t, err) + TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i)) + conn.Close() + } + + // StopAll should stop everything + err = m.StopAll() + require.Nil(t, err) + + // Verify failures + for i := 0; i < len(addrs); i++ { + _, err = net.Dial("tcp", addrs[i]) + require.NotNilf(t, err, "proxier-%d should not be running", i) + } +} diff --git a/proxy/proxier.go b/proxy/proxier.go new file mode 100644 index 0000000000..23940c6ade --- /dev/null +++ b/proxy/proxier.go @@ -0,0 +1,32 @@ +package proxy + +import ( + "errors" + "net" +) + +// ErrStopped is returned for operations on a proxy that is stopped +var ErrStopped = errors.New("stopped") + +// ErrStopping is returned for operations on a proxy that is stopping +var ErrStopping = errors.New("stopping") + +// Proxier is an interface for managing different proxy implementations in a +// standard way. We have at least two different types of Proxier implementations +// needed: one for the incoming mTLS -> local proxy and another for each +// "upstream" service the app needs to talk out to (which listens locally and +// performs service discovery to find a suitable remote service). +type Proxier interface { + // Listener returns a net.Listener that is open and ready for use, the Proxy + // manager will arrange accepting new connections from it and passing them to + // the handler method. + Listener() (net.Listener, error) + + // HandleConn is called for each incoming connection accepted by the listener. + // It is called in it's own goroutine and should run until it hits an error. + // When stopping the Proxier, the manager will simply close the conn provided + // and expects an error to be eventually returned. Any time spent not blocked + // on the passed conn (for example doing service discovery) should therefore + // be time-bound so that shutdown can't stall forever. + HandleConn(conn net.Conn) error +} diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000000..a293466b82 --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,112 @@ +package proxy + +import ( + "context" + "log" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/connect" +) + +// Proxy implements the built-in connect proxy. +type Proxy struct { + proxyID, token string + + connect connect.Client + manager *Manager + cfgWatch ConfigWatcher + cfg *Config + + logger *log.Logger +} + +// NewFromConfigFile returns a Proxy instance configured just from a local file. +// This is intended mostly for development and bypasses the normal mechanisms +// for fetching config and certificates from the local agent. +func NewFromConfigFile(client *api.Client, filename string, + logger *log.Logger) (*Proxy, error) { + cfg, err := ParseConfigFile(filename) + if err != nil { + return nil, err + } + + connect, err := connect.NewInsecureDevClientWithLocalCerts(client, + cfg.DevCAFile, cfg.DevServiceCertFile, cfg.DevServiceKeyFile) + if err != nil { + return nil, err + } + + p := &Proxy{ + proxyID: cfg.ProxyID, + connect: connect, + manager: NewManagerWithLogger(logger), + cfgWatch: NewStaticConfigWatcher(cfg), + logger: logger, + } + return p, nil +} + +// New returns a Proxy with the given id, consuming the provided (configured) +// agent. It is ready to Run(). +func New(client *api.Client, proxyID string, logger *log.Logger) (*Proxy, error) { + p := &Proxy{ + proxyID: proxyID, + connect: connect.NewClient(client), + manager: NewManagerWithLogger(logger), + cfgWatch: &AgentConfigWatcher{client: client}, + logger: logger, + } + return p, nil +} + +// Run the proxy instance until a fatal error occurs or ctx is cancelled. +func (p *Proxy) Run(ctx context.Context) error { + defer p.manager.StopAll() + + // Watch for config changes (initial setup happens on first "change") + for { + select { + case newCfg := <-p.cfgWatch.Watch(): + p.logger.Printf("[DEBUG] got new config") + if p.cfg == nil { + // Initial setup + err := p.startPublicListener(ctx, newCfg.PublicListener) + if err != nil { + return err + } + } + + // TODO add/remove upstreams properly based on a diff with current + for _, uc := range newCfg.Upstreams { + uc.Client = p.connect + uc.logger = p.logger + err := p.manager.RunProxier(uc.String(), NewUpstream(uc)) + if err == ErrExists { + continue + } + if err != nil { + p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(), + err) + } + } + p.cfg = newCfg + + case <-ctx.Done(): + return nil + } + } +} + +func (p *Proxy) startPublicListener(ctx context.Context, + cfg PublicListenerConfig) error { + + // Get TLS creds + tlsCfg, err := p.connect.ServerTLSConfig() + if err != nil { + return err + } + cfg.TLSConfig = tlsCfg + + cfg.logger = p.logger + return p.manager.RunProxier("public_listener", NewPublicListener(cfg)) +} diff --git a/proxy/public_listener.go b/proxy/public_listener.go new file mode 100644 index 0000000000..1942992cf4 --- /dev/null +++ b/proxy/public_listener.go @@ -0,0 +1,119 @@ +package proxy + +import ( + "crypto/tls" + "fmt" + "log" + "net" + "os" + "time" +) + +// PublicListener provides an implementation of Proxier that listens for inbound +// mTLS connections, authenticates them with the local agent, and if successful +// forwards them to the locally configured app. +type PublicListener struct { + cfg *PublicListenerConfig +} + +// PublicListenerConfig contains the most basic parameters needed to start the +// proxy. +// +// Note that the tls.Configs here are expected to be "dynamic" in the sense that +// they are expected to use `GetConfigForClient` (added in go 1.8) to return +// dynamic config per connection if required. +type PublicListenerConfig struct { + // BindAddress is the host:port the public mTLS listener will bind to. + BindAddress string `json:"bind_address" hcl:"bind_address"` + + // LocalServiceAddress is the host:port for the proxied application. This + // should be on loopback or otherwise protected as it's plain TCP. + LocalServiceAddress string `json:"local_service_address" hcl:"local_service_address"` + + // TLSConfig config is used for the mTLS listener. + TLSConfig *tls.Config + + // LocalConnectTimeout is the timeout for establishing connections with the + // local backend. Defaults to 1000 (1s). + LocalConnectTimeoutMs int `json:"local_connect_timeout_ms" hcl:"local_connect_timeout_ms"` + + // HandshakeTimeout is the timeout for incoming mTLS clients to complete a + // handshake. Setting this low avoids DOS by malicious clients holding + // resources open. Defaults to 10000 (10s). + HandshakeTimeoutMs int `json:"handshake_timeout_ms" hcl:"handshake_timeout_ms"` + + logger *log.Logger +} + +func (plc *PublicListenerConfig) applyDefaults() { + if plc.LocalConnectTimeoutMs == 0 { + plc.LocalConnectTimeoutMs = 1000 + } + if plc.HandshakeTimeoutMs == 0 { + plc.HandshakeTimeoutMs = 10000 + } + if plc.logger == nil { + plc.logger = log.New(os.Stdout, "", log.LstdFlags) + } +} + +// NewPublicListener returns a proxy instance with the given config. +func NewPublicListener(cfg PublicListenerConfig) *PublicListener { + p := &PublicListener{ + cfg: &cfg, + } + p.cfg.applyDefaults() + return p +} + +// Listener implements Proxier +func (p *PublicListener) Listener() (net.Listener, error) { + l, err := net.Listen("tcp", p.cfg.BindAddress) + if err != nil { + return nil, err + } + + return tls.NewListener(l, p.cfg.TLSConfig), nil +} + +// HandleConn implements Proxier +func (p *PublicListener) HandleConn(conn net.Conn) error { + defer conn.Close() + tlsConn, ok := conn.(*tls.Conn) + if !ok { + return fmt.Errorf("non-TLS conn") + } + + // Setup Handshake timer + to := time.Duration(p.cfg.HandshakeTimeoutMs) * time.Millisecond + err := tlsConn.SetDeadline(time.Now().Add(to)) + if err != nil { + return err + } + + // Force TLS handshake so that abusive clients can't hold resources open + err = tlsConn.Handshake() + if err != nil { + return err + } + + // Handshake OK, clear the deadline + err = tlsConn.SetDeadline(time.Time{}) + if err != nil { + return err + } + + // Huzzah, open a connection to the backend and let them talk + // TODO maybe add a connection pool here? + to = time.Duration(p.cfg.LocalConnectTimeoutMs) * time.Millisecond + dst, err := net.DialTimeout("tcp", p.cfg.LocalServiceAddress, to) + if err != nil { + return fmt.Errorf("failed dialling local app: %s", err) + } + + p.cfg.logger.Printf("[DEBUG] accepted connection from %s", conn.RemoteAddr()) + + // Hand conn and dst over to Conn to manage the byte copying. + c := NewConn(conn, dst) + return c.CopyBytes() +} diff --git a/proxy/public_listener_test.go b/proxy/public_listener_test.go new file mode 100644 index 0000000000..83e84d6584 --- /dev/null +++ b/proxy/public_listener_test.go @@ -0,0 +1,38 @@ +package proxy + +import ( + "crypto/tls" + "testing" + + "github.com/hashicorp/consul/connect" + "github.com/stretchr/testify/require" +) + +func TestPublicListener(t *testing.T) { + addrs := TestLocalBindAddrs(t, 2) + + cfg := PublicListenerConfig{ + BindAddress: addrs[0], + LocalServiceAddress: addrs[1], + HandshakeTimeoutMs: 100, + LocalConnectTimeoutMs: 100, + TLSConfig: connect.TestTLSConfig(t, "ca1", "web"), + } + + testApp, err := NewTestTCPServer(t, cfg.LocalServiceAddress) + require.Nil(t, err) + defer testApp.Close() + + p := NewPublicListener(cfg) + + // Run proxy + r := NewRunner("test", p) + go r.Listen() + defer r.Stop() + + // Proxy and backend are running, play the part of a TLS client using same + // cert for now. + conn, err := tls.Dial("tcp", cfg.BindAddress, connect.TestTLSConfig(t, "ca1", "web")) + require.Nil(t, err) + TestEchoConn(t, conn, "") +} diff --git a/proxy/runner.go b/proxy/runner.go new file mode 100644 index 0000000000..b559b22b71 --- /dev/null +++ b/proxy/runner.go @@ -0,0 +1,118 @@ +package proxy + +import ( + "log" + "net" + "os" + "sync" + "sync/atomic" +) + +// Runner manages the lifecycle of one Proxier. +type Runner struct { + name string + p Proxier + + // Stopping is if a flag that is updated and read atomically + stopping int32 + stopCh chan struct{} + // wg is used to signal back to Stop when all goroutines have stopped + wg sync.WaitGroup + + logger *log.Logger +} + +// NewRunner returns a Runner ready to Listen. +func NewRunner(name string, p Proxier) *Runner { + return NewRunnerWithLogger(name, p, log.New(os.Stdout, "", log.LstdFlags)) +} + +// NewRunnerWithLogger returns a Runner ready to Listen using the specified +// log.Logger. +func NewRunnerWithLogger(name string, p Proxier, logger *log.Logger) *Runner { + return &Runner{ + name: name, + p: p, + stopCh: make(chan struct{}), + logger: logger, + } +} + +// Listen starts the proxier instance. It blocks until a fatal error occurs or +// Stop() is called. +func (r *Runner) Listen() error { + if atomic.LoadInt32(&r.stopping) == 1 { + return ErrStopped + } + + l, err := r.p.Listener() + if err != nil { + return err + } + r.logger.Printf("[INFO] proxy: %s listening on %s", r.name, l.Addr().String()) + + // Run goroutine that will close listener on stop + go func() { + <-r.stopCh + l.Close() + r.logger.Printf("[INFO] proxy: %s shutdown", r.name) + }() + + // Add one for the accept loop + r.wg.Add(1) + defer r.wg.Done() + + for { + conn, err := l.Accept() + if err != nil { + if atomic.LoadInt32(&r.stopping) == 1 { + return nil + } + return err + } + + go r.handle(conn) + } + + return nil +} + +func (r *Runner) handle(conn net.Conn) { + r.wg.Add(1) + defer r.wg.Done() + + // Start a goroutine that will watch for the Runner stopping and close the + // conn, or watch for the Proxier closing (e.g. because other end hung up) and + // stop the goroutine to avoid leaks + doneCh := make(chan struct{}) + defer close(doneCh) + + go func() { + select { + case <-r.stopCh: + r.logger.Printf("[DEBUG] proxy: %s: terminating conn", r.name) + conn.Close() + return + case <-doneCh: + // Connection is already closed, this goroutine not needed any more + return + } + }() + + err := r.p.HandleConn(conn) + if err != nil { + r.logger.Printf("[DEBUG] proxy: %s: connection terminated: %s", r.name, err) + } else { + r.logger.Printf("[DEBUG] proxy: %s: connection terminated", r.name) + } +} + +// Stop stops the Listener and closes any active connections immediately. +func (r *Runner) Stop() error { + old := atomic.SwapInt32(&r.stopping, 1) + if old == 0 { + close(r.stopCh) + } + r.wg.Wait() + return nil +} diff --git a/proxy/testdata/config-kitchensink.hcl b/proxy/testdata/config-kitchensink.hcl new file mode 100644 index 0000000000..7669283538 --- /dev/null +++ b/proxy/testdata/config-kitchensink.hcl @@ -0,0 +1,36 @@ +# Example proxy config with everything specified + +proxy_id = "foo" +token = "11111111-2222-3333-4444-555555555555" + +proxied_service_name = "web" +proxied_service_namespace = "default" + +# Assumes running consul in dev mode from the repo root... +dev_ca_file = "connect/testdata/ca1-ca-consul-internal.cert.pem" +dev_service_cert_file = "connect/testdata/ca1-svc-web.cert.pem" +dev_service_key_file = "connect/testdata/ca1-svc-web.key.pem" + +public_listener { + bind_address = ":9999" + local_service_address = "127.0.0.1:5000" + local_connect_timeout_ms = 1000 + handshake_timeout_ms = 5000 +} + +upstreams = [ + { + local_bind_address = "127.0.0.1:6000" + destination_name = "db" + destination_namespace = "default" + destination_type = "service" + connect_timeout_ms = 10000 + }, + { + local_bind_address = "127.0.0.1:6001" + destination_name = "geo-cache" + destination_namespace = "default" + destination_type = "prepared_query" + connect_timeout_ms = 10000 + } +] diff --git a/proxy/testing.go b/proxy/testing.go new file mode 100644 index 0000000000..bd132b77f4 --- /dev/null +++ b/proxy/testing.go @@ -0,0 +1,170 @@ +package proxy + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "log" + "net" + "sync/atomic" + + "github.com/hashicorp/consul/lib/freeport" + "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/require" +) + +// TestLocalBindAddrs returns n localhost address:port strings with free ports +// for binding test listeners to. +func TestLocalBindAddrs(t testing.T, n int) []string { + ports := freeport.GetT(t, n) + addrs := make([]string, n) + for i, p := range ports { + addrs[i] = fmt.Sprintf("localhost:%d", p) + } + return addrs +} + +// TestTCPServer is a simple TCP echo server for use during tests. +type TestTCPServer struct { + l net.Listener + stopped int32 + accepted, closed, active int32 +} + +// NewTestTCPServer opens as a listening socket on the given address and returns +// a TestTCPServer serving requests to it. The server is already started and can +// be stopped by calling Close(). +func NewTestTCPServer(t testing.T, addr string) (*TestTCPServer, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + log.Printf("test tcp server listening on %s", addr) + s := &TestTCPServer{ + l: l, + } + go s.accept() + return s, nil +} + +// Close stops the server +func (s *TestTCPServer) Close() { + atomic.StoreInt32(&s.stopped, 1) + if s.l != nil { + s.l.Close() + } +} + +func (s *TestTCPServer) accept() error { + for { + conn, err := s.l.Accept() + if err != nil { + if atomic.LoadInt32(&s.stopped) == 1 { + log.Printf("test tcp echo server %s stopped", s.l.Addr()) + return nil + } + log.Printf("test tcp echo server %s failed: %s", s.l.Addr(), err) + return err + } + + atomic.AddInt32(&s.accepted, 1) + atomic.AddInt32(&s.active, 1) + + go func(c net.Conn) { + io.Copy(c, c) + atomic.AddInt32(&s.closed, 1) + atomic.AddInt32(&s.active, -1) + }(conn) + } +} + +// TestEchoConn attempts to write some bytes to conn and expects to read them +// back within a short timeout (10ms). If prefix is not empty we expect it to be +// poresent at the start of all echoed responses (for example to distinguish +// between multiple echo server instances). +func TestEchoConn(t testing.T, conn net.Conn, prefix string) { + t.Helper() + + // Write some bytes and read them back + n, err := conn.Write([]byte("Hello World")) + require.Equal(t, 11, n) + require.Nil(t, err) + + expectLen := 11 + len(prefix) + + buf := make([]byte, expectLen) + // read until our buffer is full - it might be separate packets if prefix is + // in use. + got := 0 + for got < expectLen { + n, err = conn.Read(buf[got:]) + require.Nil(t, err) + got += n + } + require.Equal(t, expectLen, got) + require.Equal(t, prefix+"Hello World", string(buf[:])) +} + +// TestConnectClient is a testing mock that implements connect.Client but +// stubs the methods to make testing simpler. +type TestConnectClient struct { + Server *TestTCPServer + TLSConfig *tls.Config + Calls []callTuple +} +type callTuple struct { + typ, ns, name string +} + +// ServerTLSConfig implements connect.Client +func (c *TestConnectClient) ServerTLSConfig() (*tls.Config, error) { + return c.TLSConfig, nil +} + +// DialService implements connect.Client +func (c *TestConnectClient) DialService(ctx context.Context, namespace, + name string) (net.Conn, error) { + + c.Calls = append(c.Calls, callTuple{"service", namespace, name}) + + // Actually returning a vanilla TCP conn not a TLS one but the caller + // shouldn't care for tests since this interface should hide all the TLS + // config and verification. + return net.Dial("tcp", c.Server.l.Addr().String()) +} + +// DialPreparedQuery implements connect.Client +func (c *TestConnectClient) DialPreparedQuery(ctx context.Context, namespace, + name string) (net.Conn, error) { + + c.Calls = append(c.Calls, callTuple{"prepared_query", namespace, name}) + + // Actually returning a vanilla TCP conn not a TLS one but the caller + // shouldn't care for tests since this interface should hide all the TLS + // config and verification. + return net.Dial("tcp", c.Server.l.Addr().String()) +} + +// TestProxier is a simple Proxier instance that can be used in tests. +type TestProxier struct { + // Addr to listen on + Addr string + // Prefix to write first before echoing on new connections + Prefix string +} + +// Listener implements Proxier +func (p *TestProxier) Listener() (net.Listener, error) { + return net.Listen("tcp", p.Addr) +} + +// HandleConn implements Proxier +func (p *TestProxier) HandleConn(conn net.Conn) error { + _, err := conn.Write([]byte(p.Prefix)) + if err != nil { + return err + } + _, err = io.Copy(conn, conn) + return err +} diff --git a/proxy/upstream.go b/proxy/upstream.go new file mode 100644 index 0000000000..1101624be7 --- /dev/null +++ b/proxy/upstream.go @@ -0,0 +1,261 @@ +package proxy + +import ( + "context" + "fmt" + "log" + "net" + "os" + "time" + + "github.com/hashicorp/consul/connect" +) + +// Upstream provides an implementation of Proxier that listens for inbound TCP +// connections on the private network shared with the proxied application +// (typically localhost). For each accepted connection from the app, it uses the +// connect.Client to discover an instance and connect over mTLS. +type Upstream struct { + cfg *UpstreamConfig +} + +// UpstreamConfig configures the upstream +type UpstreamConfig struct { + // Client is the connect client to perform discovery with + Client connect.Client + + // LocalAddress is the host:port to listen on for local app connections. + LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr"` + + // DestinationName is the service name of the destination. + DestinationName string `json:"destination_name" hcl:"destination_name,attr"` + + // DestinationNamespace is the namespace of the destination. + DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr"` + + // DestinationType determines which service discovery method is used to find a + // candidate instance to connect to. + DestinationType string `json:"destination_type" hcl:"destination_type,attr"` + + // ConnectTimeout is the timeout for establishing connections with the remote + // service instance. Defaults to 10,000 (10s). + ConnectTimeoutMs int `json:"connect_timeout_ms" hcl:"connect_timeout_ms,attr"` + + logger *log.Logger +} + +func (uc *UpstreamConfig) applyDefaults() { + if uc.ConnectTimeoutMs == 0 { + uc.ConnectTimeoutMs = 10000 + } + if uc.logger == nil { + uc.logger = log.New(os.Stdout, "", log.LstdFlags) + } +} + +// String returns a string that uniquely identifies the Upstream. Used for +// identifying the upstream in log output and map keys. +func (uc *UpstreamConfig) String() string { + return fmt.Sprintf("%s->%s:%s/%s", uc.LocalBindAddress, uc.DestinationType, + uc.DestinationNamespace, uc.DestinationName) +} + +// NewUpstream returns an outgoing proxy instance with the given config. +func NewUpstream(cfg UpstreamConfig) *Upstream { + u := &Upstream{ + cfg: &cfg, + } + u.cfg.applyDefaults() + return u +} + +// String returns a string that uniquely identifies the Upstream. Used for +// identifying the upstream in log output and map keys. +func (u *Upstream) String() string { + return u.cfg.String() +} + +// Listener implements Proxier +func (u *Upstream) Listener() (net.Listener, error) { + return net.Listen("tcp", u.cfg.LocalBindAddress) +} + +// HandleConn implements Proxier +func (u *Upstream) HandleConn(conn net.Conn) error { + defer conn.Close() + + // Discover destination instance + dst, err := u.discoverAndDial() + if err != nil { + return err + } + + // Hand conn and dst over to Conn to manage the byte copying. + c := NewConn(conn, dst) + return c.CopyBytes() +} + +func (u *Upstream) discoverAndDial() (net.Conn, error) { + to := time.Duration(u.cfg.ConnectTimeoutMs) * time.Millisecond + ctx, cancel := context.WithTimeout(context.Background(), to) + defer cancel() + + switch u.cfg.DestinationType { + case "service": + return u.cfg.Client.DialService(ctx, u.cfg.DestinationNamespace, + u.cfg.DestinationName) + + case "prepared_query": + return u.cfg.Client.DialPreparedQuery(ctx, u.cfg.DestinationNamespace, + u.cfg.DestinationName) + + default: + return nil, fmt.Errorf("invalid destination type %s", u.cfg.DestinationType) + } +} + +/* +// Upstream represents a service that the proxied application needs to connect +// out to. It provides a dedication local TCP listener (usually listening only +// on loopback) and forwards incoming connections to the proxy to handle. +type Upstream struct { + cfg *UpstreamConfig + wg sync.WaitGroup + + proxy *Proxy + fatalErr error +} + +// NewUpstream creates an upstream ready to attach to a proxy instance with +// Proxy.AddUpstream. An Upstream can only be attached to single Proxy instance +// at once. +func NewUpstream(p *Proxy, cfg *UpstreamConfig) *Upstream { + return &Upstream{ + cfg: cfg, + proxy: p, + shutdown: make(chan struct{}), + } +} + +// UpstreamConfig configures the upstream +type UpstreamConfig struct { + // LocalAddress is the host:port to listen on for local app connections. + LocalAddress string + + // DestinationName is the service name of the destination. + DestinationName string + + // DestinationNamespace is the namespace of the destination. + DestinationNamespace string + + // DestinationType determines which service discovery method is used to find a + // candidate instance to connect to. + DestinationType string +} + +// String returns a string representation for the upstream for debugging or +// use as a unique key. +func (uc *UpstreamConfig) String() string { + return fmt.Sprintf("%s->%s:%s/%s", uc.LocalAddress, uc.DestinationType, + uc.DestinationNamespace, uc.DestinationName) +} + +func (u *Upstream) listen() error { + l, err := net.Listen("tcp", u.cfg.LocalAddress) + if err != nil { + u.fatal(err) + return + } + + for { + conn, err := l.Accept() + if err != nil { + return err + } + + go u.discoverAndConnect(conn) + } +} + +func (u *Upstream) discoverAndConnect(src net.Conn) { + // First, we need an upstream instance from Consul to connect to + dstAddrs, err := u.discoverInstances() + if err != nil { + u.fatal(fmt.Errorf("failed to discover upstream instances: %s", err)) + return + } + + if len(dstAddrs) < 1 { + log.Printf("[INFO] no instances found for %s", len(dstAddrs), u) + } + + // Attempt connection to first one that works + // TODO: configurable number/deadline? + for idx, addr := range dstAddrs { + err := u.proxy.startProxyingConn(src, addr, false) + if err != nil { + log.Printf("[INFO] failed to connect to %s: %s (%d of %d)", addr, err, + idx+1, len(dstAddrs)) + continue + } + return + } + + log.Printf("[INFO] failed to connect to all %d instances for %s", + len(dstAddrs), u) +} + +func (u *Upstream) discoverInstances() ([]string, error) { + switch u.cfg.DestinationType { + case "service": + svcs, _, err := u.cfg.Consul.Health().Service(u.cfg.DestinationName, "", + true, nil) + if err != nil { + return nil, err + } + + addrs := make([]string, len(svcs)) + + // Shuffle order as we go since health endpoint doesn't + perm := rand.Perm(len(addrs)) + for i, se := range svcs { + // Pick location in output array based on next permutation position + j := perm[i] + addrs[j] = fmt.Sprintf("%s:%d", se.Service.Address, se.Service.Port) + } + + return addrs, nil + + case "prepared_query": + pqr, _, err := u.cfg.Consul.PreparedQuery().Execute(u.cfg.DestinationName, + nil) + if err != nil { + return nil, err + } + + addrs := make([]string, 0, len(svcs)) + for _, se := range pqr.Nodes { + addrs = append(addrs, fmt.Sprintf("%s:%d", se.Service.Address, + se.Service.Port)) + } + + // PreparedQuery execution already shuffles the result + return addrs, nil + + default: + u.fatal(fmt.Errorf("invalid destination type %s", u.cfg.DestinationType)) + } +} + +func (u *Upstream) fatal(err Error) { + log.Printf("[ERROR] upstream %s stopping on error: %s", u.cfg.LocalAddress, + err) + u.fatalErr = err +} + +// String returns a string representation for the upstream for debugging or +// use as a unique key. +func (u *Upstream) String() string { + return u.cfg.String() +} +*/ diff --git a/proxy/upstream_test.go b/proxy/upstream_test.go new file mode 100644 index 0000000000..79bca0136a --- /dev/null +++ b/proxy/upstream_test.go @@ -0,0 +1,75 @@ +package proxy + +import ( + "net" + "testing" + + "github.com/hashicorp/consul/connect" + "github.com/stretchr/testify/require" +) + +func TestUpstream(t *testing.T) { + tests := []struct { + name string + cfg UpstreamConfig + }{ + { + name: "service", + cfg: UpstreamConfig{ + DestinationType: "service", + DestinationNamespace: "default", + DestinationName: "db", + ConnectTimeoutMs: 100, + }, + }, + { + name: "prepared_query", + cfg: UpstreamConfig{ + DestinationType: "prepared_query", + DestinationNamespace: "default", + DestinationName: "geo-db", + ConnectTimeoutMs: 100, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + addrs := TestLocalBindAddrs(t, 2) + + testApp, err := NewTestTCPServer(t, addrs[0]) + require.Nil(t, err) + defer testApp.Close() + + // Create mock client that will "discover" our test tcp server as a target and + // skip TLS altogether. + client := &TestConnectClient{ + Server: testApp, + TLSConfig: connect.TestTLSConfig(t, "ca1", "web"), + } + + // Override cfg params + tt.cfg.LocalBindAddress = addrs[1] + tt.cfg.Client = client + + u := NewUpstream(tt.cfg) + + // Run proxy + r := NewRunner("test", u) + go r.Listen() + defer r.Stop() + + // Proxy and fake remote service are running, play the part of the app + // connecting to a remote connect service over TCP. + conn, err := net.Dial("tcp", tt.cfg.LocalBindAddress) + require.Nil(t, err) + TestEchoConn(t, conn, "") + + // Validate that discovery actually was called as we expected + require.Len(t, client.Calls, 1) + require.Equal(t, tt.cfg.DestinationType, client.Calls[0].typ) + require.Equal(t, tt.cfg.DestinationNamespace, client.Calls[0].ns) + require.Equal(t, tt.cfg.DestinationName, client.Calls[0].name) + }) + } +}