diff --git a/agent/agent.go b/agent/agent.go index 23e7b90d6e..fa2ed94eb7 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -18,6 +18,8 @@ import ( "sync" "time" + "google.golang.org/grpc" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/ae" @@ -27,10 +29,12 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxyprocess" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/systemd" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/agent/xds" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" @@ -218,8 +222,23 @@ type Agent struct { // proxyManager is the proxy process manager for managed Connect proxies. proxyManager *proxyprocess.Manager - // proxyLock protects proxy information in the local state from concurrent modification + // proxyLock protects _managed_ proxy information in the local state from + // concurrent modification. It is not needed to work with proxyConfig state. proxyLock sync.Mutex + + // proxyConfig is the manager for proxy service (Kind = connect-proxy) + // configuration state. This ensures all state needed by a proxy registration + // is maintained in cache and handles pushing updates to that state into XDS + // server to be pushed out to Envoy. This is NOT related to managed proxies + // directly. + proxyConfig *proxycfg.Manager + + // xdsServer is the Server instance that serves xDS gRPC API. + xdsServer *xds.Server + + // grpcServer is the server instance used currently to serve xDS API for + // Envoy. + grpcServer *grpc.Server } func New(c *config.RuntimeConfig) (*Agent, error) { @@ -409,6 +428,21 @@ func (a *Agent) Start() error { } } + // Start the proxy config manager. + a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ + Cache: a.cache, + Logger: a.logger, + State: a.State, + Source: &structs.QuerySource{ + Node: a.config.NodeName, + Datacenter: a.config.Datacenter, + Segment: a.config.SegmentName, + }, + }) + if err != nil { + return err + } + // Start watching for critical services to deregister, based on their // checks. go a.reapServices() @@ -446,6 +480,11 @@ func (a *Agent) Start() error { a.httpServers = append(a.httpServers, srv) } + // Start gRPC server. + if err := a.listenAndServeGRPC(); err != nil { + return err + } + // register watches if err := a.reloadWatches(a.config); err != nil { return err @@ -458,6 +497,43 @@ func (a *Agent) Start() error { return nil } +func (a *Agent) listenAndServeGRPC() error { + if len(a.config.GRPCAddrs) < 1 { + return nil + } + + a.xdsServer = &xds.Server{ + Logger: a.logger, + CfgMgr: a.proxyConfig, + Authz: a, + ResolveToken: func(id string) (acl.ACL, error) { + return a.resolveToken(id) + }, + } + var err error + a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile) + if err != nil { + return err + } + + ln, err := a.startListeners(a.config.GRPCAddrs) + if err != nil { + return err + } + + for _, l := range ln { + go func(innerL net.Listener) { + a.logger.Printf("[INFO] agent: Started gRPC server on %s (%s)", + innerL.Addr().String(), innerL.Addr().Network()) + err := a.grpcServer.Serve(innerL) + if err != nil { + a.logger.Printf("[ERR] gRPC server failed: %s", err) + } + }(l) + } + return nil +} + func (a *Agent) listenAndServeDNS() error { notif := make(chan net.Addr, len(a.config.DNSAddrs)) errCh := make(chan error, len(a.config.DNSAddrs)) @@ -497,6 +573,34 @@ func (a *Agent) listenAndServeDNS() error { return merr.ErrorOrNil() } +func (a *Agent) startListeners(addrs []net.Addr) ([]net.Listener, error) { + var ln []net.Listener + for _, addr := range addrs { + var l net.Listener + var err error + + switch x := addr.(type) { + case *net.UnixAddr: + l, err = a.listenSocket(x.Name) + if err != nil { + return nil, err + } + + case *net.TCPAddr: + l, err = net.Listen("tcp", x.String()) + if err != nil { + return nil, err + } + l = &tcpKeepAliveListener{l.(*net.TCPListener)} + + default: + return nil, fmt.Errorf("unsupported address type %T", addr) + } + ln = append(ln, l) + } + return ln, nil +} + // listenHTTP binds listeners to the provided addresses and also returns // pre-configured HTTP servers which are not yet started. The motivation is // that in the current startup/shutdown setup we de-couple the listener @@ -516,38 +620,21 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) { var ln []net.Listener var servers []*HTTPServer start := func(proto string, addrs []net.Addr) error { - for _, addr := range addrs { - var l net.Listener + listeners, err := a.startListeners(addrs) + if err != nil { + return err + } + + for _, l := range listeners { var tlscfg *tls.Config - var err error - - switch x := addr.(type) { - case *net.UnixAddr: - l, err = a.listenSocket(x.Name) + _, isTCP := l.(*tcpKeepAliveListener) + if isTCP && proto == "https" { + tlscfg, err = a.config.IncomingHTTPSConfig() if err != nil { return err } - - case *net.TCPAddr: - l, err = net.Listen("tcp", x.String()) - if err != nil { - return err - } - l = &tcpKeepAliveListener{l.(*net.TCPListener)} - - if proto == "https" { - tlscfg, err = a.config.IncomingHTTPSConfig() - if err != nil { - return err - } - l = tls.NewListener(l, tlscfg) - } - - default: - return fmt.Errorf("unsupported address type %T", addr) + l = tls.NewListener(l, tlscfg) } - ln = append(ln, l) - srv := &HTTPServer{ Server: &http.Server{ Addr: l.Addr().String(), @@ -569,6 +656,7 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) { } } + ln = append(ln, l) servers = append(servers, srv) } return nil @@ -1341,7 +1429,17 @@ func (a *Agent) ShutdownAgent() error { chk.Stop() } - // Stop the proxy manager + // Stop gRPC + if a.grpcServer != nil { + a.grpcServer.Stop() + } + + // Stop the proxy config manager + if a.proxyConfig != nil { + a.proxyConfig.Close() + } + + // Stop the proxy process manager if a.proxyManager != nil { // If persistence is disabled (implies DevMode but a subset of DevMode) then // don't leave the proxies running since the agent will not be able to diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index d35c069cc1..0fcf4c294c 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -6,7 +6,6 @@ import ( "log" "net" "net/http" - "net/url" "strconv" "strings" "time" @@ -20,7 +19,6 @@ import ( "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" - "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/ipaddr" @@ -1432,132 +1430,14 @@ func (s *HTTPServer) AgentConnectAuthorize(resp http.ResponseWriter, req *http.R // Decode the request from the request body var authReq structs.ConnectAuthorizeRequest if err := decodeBody(req, &authReq, nil); err != nil { - resp.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(resp, "Request decode failed: %v", err) - return nil, nil + return nil, BadRequestError{fmt.Sprintf("Request decode failed: %v", err)} } - // We need to have a target to check intentions - if authReq.Target == "" { - resp.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(resp, "Target service must be specified") - return nil, nil - } - - // Parse the certificate URI from the client ID - uriRaw, err := url.Parse(authReq.ClientCertURI) - if err != nil { - return &connectAuthorizeResp{ - Authorized: false, - Reason: fmt.Sprintf("Client ID must be a URI: %s", err), - }, nil - } - uri, err := connect.ParseCertURI(uriRaw) - if err != nil { - return &connectAuthorizeResp{ - Authorized: false, - Reason: fmt.Sprintf("Invalid client ID: %s", err), - }, nil - } - - uriService, ok := uri.(*connect.SpiffeIDService) - if !ok { - return &connectAuthorizeResp{ - Authorized: false, - Reason: "Client ID must be a valid SPIFFE service URI", - }, nil - } - - // We need to verify service:write permissions for the given token. - // We do this manually here since the RPC request below only verifies - // service:read. - rule, err := s.agent.resolveToken(token) + authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq) if err != nil { return nil, err } - if rule != nil && !rule.ServiceWrite(authReq.Target, nil) { - return nil, acl.ErrPermissionDenied - } - - // Validate the trust domain matches ours. Later we will support explicit - // external federation but not built yet. - rootArgs := &structs.DCSpecificRequest{Datacenter: s.agent.config.Datacenter} - raw, _, err := s.agent.cache.Get(cachetype.ConnectCARootName, rootArgs) - if err != nil { - return nil, err - } - - roots, ok := raw.(*structs.IndexedCARoots) - if !ok { - return nil, fmt.Errorf("internal error: roots response type not correct") - } - if roots.TrustDomain == "" { - return nil, fmt.Errorf("connect CA not bootstrapped yet") - } - if roots.TrustDomain != strings.ToLower(uriService.Host) { - return &connectAuthorizeResp{ - Authorized: false, - Reason: fmt.Sprintf("Identity from an external trust domain: %s", - uriService.Host), - }, nil - } - - // TODO(banks): Implement revocation list checking here. - - // Get the intentions for this target service. - args := &structs.IntentionQueryRequest{ - Datacenter: s.agent.config.Datacenter, - Match: &structs.IntentionQueryMatch{ - Type: structs.IntentionMatchDestination, - Entries: []structs.IntentionMatchEntry{ - { - Namespace: structs.IntentionDefaultNamespace, - Name: authReq.Target, - }, - }, - }, - } - args.Token = token - - raw, m, err := s.agent.cache.Get(cachetype.IntentionMatchName, args) - if err != nil { - return nil, err - } - setCacheMeta(resp, &m) - - reply, ok := raw.(*structs.IndexedIntentionMatches) - if !ok { - return nil, fmt.Errorf("internal error: response type not correct") - } - if len(reply.Matches) != 1 { - return nil, fmt.Errorf("Internal error loading matches") - } - - // Test the authorization for each match - for _, ixn := range reply.Matches[0] { - if auth, ok := uriService.Authorize(ixn); ok { - return &connectAuthorizeResp{ - Authorized: auth, - Reason: fmt.Sprintf("Matched intention: %s", ixn.String()), - }, nil - } - } - - // No match, we need to determine the default behavior. We do this by - // specifying the anonymous token token, which will get that behavior. - // The default behavior if ACLs are disabled is to allow connections - // to mimic the behavior of Consul itself: everything is allowed if - // ACLs are disabled. - rule, err = s.agent.resolveToken("") - if err != nil { - return nil, err - } - authz := true - reason := "ACLs disabled, access is allowed by default" - if rule != nil { - authz = rule.IntentionDefaultAllow() - reason = "Default behavior configured by ACLs" - } + setCacheMeta(resp, cacheMeta) return &connectAuthorizeResp{ Authorized: authz, diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 1a0b6115f5..c074e50fd9 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -4938,6 +4938,7 @@ func TestAgentConnectAuthorize_badBody(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() @@ -4945,16 +4946,19 @@ func TestAgentConnectAuthorize_badBody(t *testing.T) { args := []string{} req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() - _, err := a.srv.AgentConnectAuthorize(resp, req) - assert.Nil(err) - assert.Equal(400, resp.Code) - assert.Contains(resp.Body.String(), "decode") + respRaw, err := a.srv.AgentConnectAuthorize(resp, req) + require.Error(err) + assert.Nil(respRaw) + // Note that BadRequestError is handled outside the endpoint handler so we + // still see a 200 if we check here. + assert.Contains(err.Error(), "decode failed") } func TestAgentConnectAuthorize_noTarget(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() @@ -4962,10 +4966,12 @@ func TestAgentConnectAuthorize_noTarget(t *testing.T) { args := &structs.ConnectAuthorizeRequest{} req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() - _, err := a.srv.AgentConnectAuthorize(resp, req) - assert.Nil(err) - assert.Equal(400, resp.Code) - assert.Contains(resp.Body.String(), "Target service") + respRaw, err := a.srv.AgentConnectAuthorize(resp, req) + require.Error(err) + assert.Nil(respRaw) + // Note that BadRequestError is handled outside the endpoint handler so we + // still see a 200 if we check here. + assert.Contains(err.Error(), "Target service must be specified") } // Client ID is not in the valid URI format @@ -4973,6 +4979,7 @@ func TestAgentConnectAuthorize_idInvalidFormat(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() @@ -4984,12 +4991,11 @@ func TestAgentConnectAuthorize_idInvalidFormat(t *testing.T) { req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() respRaw, err := a.srv.AgentConnectAuthorize(resp, req) - assert.Nil(err) - assert.Equal(200, resp.Code) - - obj := respRaw.(*connectAuthorizeResp) - assert.False(obj.Authorized) - assert.Contains(obj.Reason, "Invalid client") + require.Error(err) + assert.Nil(respRaw) + // Note that BadRequestError is handled outside the endpoint handler so we + // still see a 200 if we check here. + assert.Contains(err.Error(), "ClientCertURI not a valid Connect identifier") } // Client ID is a valid URI but its not a service URI @@ -4997,6 +5003,7 @@ func TestAgentConnectAuthorize_idNotService(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() @@ -5008,12 +5015,11 @@ func TestAgentConnectAuthorize_idNotService(t *testing.T) { req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() respRaw, err := a.srv.AgentConnectAuthorize(resp, req) - assert.Nil(err) - assert.Equal(200, resp.Code) - - obj := respRaw.(*connectAuthorizeResp) - assert.False(obj.Authorized) - assert.Contains(obj.Reason, "must be a valid") + require.Error(err) + assert.Nil(respRaw) + // Note that BadRequestError is handled outside the endpoint handler so we + // still see a 200 if we check here. + assert.Contains(err.Error(), "ClientCertURI not a valid Service identifier") } // Test when there is an intention allowing the connection @@ -5162,6 +5168,7 @@ func TestAgentConnectAuthorize_denyTrustDomain(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() @@ -5182,7 +5189,7 @@ func TestAgentConnectAuthorize_denyTrustDomain(t *testing.T) { req.Intention.Action = structs.IntentionActionAllow var reply string - assert.Nil(a.RPC("Intention.Apply", &req, &reply)) + require.NoError(a.RPC("Intention.Apply", &req, &reply)) } { @@ -5193,7 +5200,7 @@ func TestAgentConnectAuthorize_denyTrustDomain(t *testing.T) { req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() respRaw, err := a.srv.AgentConnectAuthorize(resp, req) - assert.Nil(err) + require.NoError(err) assert.Equal(200, resp.Code) obj := respRaw.(*connectAuthorizeResp) @@ -5206,6 +5213,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) { t.Parallel() assert := assert.New(t) + require := require.New(t) a := NewTestAgent(t.Name(), "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") @@ -5227,7 +5235,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) { req.Intention.Action = structs.IntentionActionDeny var reply string - assert.Nil(a.RPC("Intention.Apply", &req, &reply)) + require.NoError(a.RPC("Intention.Apply", &req, &reply)) } { // Allow web to DB @@ -5255,7 +5263,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) { req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() respRaw, err := a.srv.AgentConnectAuthorize(resp, req) - assert.Nil(err) + require.NoError(err) assert.Equal(200, resp.Code) obj := respRaw.(*connectAuthorizeResp) @@ -5272,7 +5280,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) { req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args)) resp := httptest.NewRecorder() respRaw, err := a.srv.AgentConnectAuthorize(resp, req) - assert.Nil(err) + require.NoError(err) assert.Equal(200, resp.Code) obj := respRaw.(*connectAuthorizeResp) diff --git a/agent/connect_auth.go b/agent/connect_auth.go new file mode 100644 index 0000000000..d9e9c31fed --- /dev/null +++ b/agent/connect_auth.go @@ -0,0 +1,138 @@ +package agent + +import ( + "fmt" + "strings" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/structs" +) + +// ConnectAuthorize implements the core authorization logic for Connect. It's in +// a separate agent method here because we need to re-use this both in our own +// HTTP API authz endpoint and in the gRPX xDS/ext_authz API for envoy. +// +// The ACL token and the auth request are provided and the auth decision (true +// means authorised) and reason string are returned. +// +// If the request input is invalid the error returned will be a BadRequestError, +// if the token doesn't grant necessary access then an acl.ErrPermissionDenied +// error is returned, otherwise error indicates an unexpected server failure. If +// access is denied, no error is returned but the first return value is false. +func (a *Agent) ConnectAuthorize(token string, + req *structs.ConnectAuthorizeRequest) (authz bool, reason string, m *cache.ResultMeta, err error) { + + // Helper to make the error cases read better without resorting to named + // returns which get messy and prone to mistakes in a method this long. + returnErr := func(err error) (bool, string, *cache.ResultMeta, error) { + return false, "", nil, err + } + + if req == nil { + return returnErr(BadRequestError{"Invalid request"}) + } + + // We need to have a target to check intentions + if req.Target == "" { + return returnErr(BadRequestError{"Target service must be specified"}) + } + + // Parse the certificate URI from the client ID + uri, err := connect.ParseCertURIFromString(req.ClientCertURI) + if err != nil { + return returnErr(BadRequestError{"ClientCertURI not a valid Connect identifier"}) + } + + uriService, ok := uri.(*connect.SpiffeIDService) + if !ok { + return returnErr(BadRequestError{"ClientCertURI not a valid Service identifier"}) + } + + // We need to verify service:write permissions for the given token. + // We do this manually here since the RPC request below only verifies + // service:read. + rule, err := a.resolveToken(token) + if err != nil { + return returnErr(err) + } + if rule != nil && !rule.ServiceWrite(req.Target, nil) { + return returnErr(acl.ErrPermissionDenied) + } + + // Validate the trust domain matches ours. Later we will support explicit + // external federation but not built yet. + rootArgs := &structs.DCSpecificRequest{Datacenter: a.config.Datacenter} + raw, _, err := a.cache.Get(cachetype.ConnectCARootName, rootArgs) + if err != nil { + return returnErr(err) + } + + roots, ok := raw.(*structs.IndexedCARoots) + if !ok { + return returnErr(fmt.Errorf("internal error: roots response type not correct")) + } + if roots.TrustDomain == "" { + return returnErr(fmt.Errorf("Connect CA not bootstrapped yet")) + } + if roots.TrustDomain != strings.ToLower(uriService.Host) { + reason = fmt.Sprintf("Identity from an external trust domain: %s", + uriService.Host) + return false, reason, nil, nil + } + + // TODO(banks): Implement revocation list checking here. + + // Get the intentions for this target service. + args := &structs.IntentionQueryRequest{ + Datacenter: a.config.Datacenter, + Match: &structs.IntentionQueryMatch{ + Type: structs.IntentionMatchDestination, + Entries: []structs.IntentionMatchEntry{ + { + Namespace: structs.IntentionDefaultNamespace, + Name: req.Target, + }, + }, + }, + QueryOptions: structs.QueryOptions{Token: token}, + } + + raw, meta, err := a.cache.Get(cachetype.IntentionMatchName, args) + if err != nil { + return returnErr(err) + } + + reply, ok := raw.(*structs.IndexedIntentionMatches) + if !ok { + return returnErr(fmt.Errorf("internal error: response type not correct")) + } + if len(reply.Matches) != 1 { + return returnErr(fmt.Errorf("Internal error loading matches")) + } + + // Test the authorization for each match + for _, ixn := range reply.Matches[0] { + if auth, ok := uriService.Authorize(ixn); ok { + reason = fmt.Sprintf("Matched intention: %s", ixn.String()) + return auth, reason, &meta, nil + } + } + + // No match, we need to determine the default behavior. We do this by + // specifying the anonymous token, which will get the default behavior. The + // default behavior if ACLs are disabled is to allow connections to mimic the + // behavior of Consul itself: everything is allowed if ACLs are disabled. + rule, err = a.resolveToken("") + if err != nil { + return returnErr(err) + } + if rule == nil { + // ACLs not enabled at all, the default is allow all. + return true, "ACLs disabled, access is allowed by default", &meta, nil + } + reason = "Default behavior configured by ACLs" + return rule.IntentionDefaultAllow(), reason, &meta, nil +} diff --git a/agent/http.go b/agent/http.go index d7fa2e6be5..b32ce1ec0f 100644 --- a/agent/http.go +++ b/agent/http.go @@ -468,8 +468,11 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) { // setCacheMeta sets http response headers to indicate cache status. func setCacheMeta(resp http.ResponseWriter, m *cache.ResultMeta) { + if m == nil { + return + } str := "MISS" - if m != nil && m.Hit { + if m.Hit { str = "HIT" } resp.Header().Set("X-Cache", str) diff --git a/agent/xds/server.go b/agent/xds/server.go index 1d0a7edf64..7a4a0addc1 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -76,7 +76,7 @@ type ConnectAuthz interface { // easier testing without several layers of mocked cache, local state and // proxycfg.Manager. type ConfigManager interface { - Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, func()) + Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) } // Server represents a gRPC server that can handle both XDS and ext_authz diff --git a/agent/xds/server_test.go b/agent/xds/server_test.go index afec10a62d..ecb27afa83 100644 --- a/agent/xds/server_test.go +++ b/agent/xds/server_test.go @@ -70,7 +70,7 @@ func (m *testManager) DeliverConfig(t *testing.T, proxyID string, cfg *proxycfg. } // Watch implements ConfigManager -func (m *testManager) Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, func()) { +func (m *testManager) Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) { m.Lock() defer m.Unlock() // ch might be nil but then it will just block forever diff --git a/api/api.go b/api/api.go index 0047812817..6efd9d4b03 100644 --- a/api/api.go +++ b/api/api.go @@ -61,6 +61,12 @@ const ( // HTTPSSLVerifyEnvName defines an environment variable name which sets // whether or not to disable certificate checking. HTTPSSLVerifyEnvName = "CONSUL_HTTP_SSL_VERIFY" + + // GRPCAddrEnvName defines an environment variable name which sets the gRPC + // address for consul connect envoy. Note this isn't actually used by the api + // client in this package but is defined here for consistency with all the + // other ENV names we use. + GRPCAddrEnvName = "CONSUL_GRPC_ADDR" ) // QueryOptions are used to parameterize a query diff --git a/command/agent/agent.go b/command/agent/agent.go index 91f022cc89..1fb76d0d43 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -22,6 +22,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/logutils" "github.com/mitchellh/cli" + "google.golang.org/grpc/grpclog" ) func New(ui cli.Ui, revision, version, versionPre, versionHuman string, shutdownCh <-chan struct{}) *cmd { @@ -202,6 +203,9 @@ func (c *cmd) run(args []string) int { c.logOutput = logOutput c.logger = log.New(logOutput, "", log.LstdFlags) + // Setup gRPC logger to use the same output/filtering + grpclog.SetLoggerV2(logger.NewGRPCLogger(logConfig, c.logger)) + memSink, err := lib.InitTelemetry(config.Telemetry) if err != nil { c.UI.Error(err.Error()) diff --git a/command/commands_oss.go b/command/commands_oss.go index 46f328b7fd..a96d90ffe6 100644 --- a/command/commands_oss.go +++ b/command/commands_oss.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/command/connect/ca" caget "github.com/hashicorp/consul/command/connect/ca/get" caset "github.com/hashicorp/consul/command/connect/ca/set" + "github.com/hashicorp/consul/command/connect/envoy" "github.com/hashicorp/consul/command/connect/proxy" "github.com/hashicorp/consul/command/event" "github.com/hashicorp/consul/command/exec" @@ -77,6 +78,7 @@ func init() { Register("connect ca get-config", func(ui cli.Ui) (cli.Command, error) { return caget.New(ui), nil }) Register("connect ca set-config", func(ui cli.Ui) (cli.Command, error) { return caset.New(ui), nil }) Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil }) + Register("connect envoy", func(ui cli.Ui) (cli.Command, error) { return envoy.New(ui), nil }) Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil }) Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil }) Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil }) diff --git a/command/connect/envoy/bootstrap_tpl.go b/command/connect/envoy/bootstrap_tpl.go new file mode 100644 index 0000000000..98ab41f5a0 --- /dev/null +++ b/command/connect/envoy/bootstrap_tpl.go @@ -0,0 +1,55 @@ +package envoy + +type templateArgs struct { + ProxyCluster, ProxyID string + AgentHTTPAddress string + AgentHTTPPort string + AgentTLS bool + AgentCAFile string + AdminBindAddress string + AdminBindPort string + LocalAgentClusterName string + Token string +} + +const bootstrapTemplate = ` +# Bootstrap Config for Consul Connect +# Generated by consul connect envoy +admin: + access_log_path: /dev/null + address: + socket_address: + address: "{{ .AdminBindAddress }}" + port_value: {{ .AdminBindPort }} +node: + cluster: "{{ .ProxyCluster }}" + id: "{{ .ProxyID }}" +static_resources: + clusters: + - name: "{{ .LocalAgentClusterName }}" + connect_timeout: 1s + type: STATIC + {{ if .AgentTLS -}} + tls_context: + common_tls_context: + validation_context: + trusted_ca: + filename: {{ .AgentCAFile }} + {{- end }} + http2_protocol_options: {} + hosts: + - socket_address: + address: "{{ .AgentHTTPAddress }}" + port_value: {{ .AgentHTTPPort }} +dynamic_resources: + lds_config: {ads: {}} + cds_config: {ads: {}} + ads_config: + api_type: GRPC + grpc_services: + initial_metadata: + - key: x-consul-token + value: "{{ .Token }}" + envoy_grpc: + cluster_name: "{{ .LocalAgentClusterName }}" +` diff --git a/command/connect/envoy/envoy.go b/command/connect/envoy/envoy.go new file mode 100644 index 0000000000..0aca239d3a --- /dev/null +++ b/command/connect/envoy/envoy.go @@ -0,0 +1,301 @@ +package envoy + +import ( + "bytes" + "flag" + "fmt" + "html/template" + "net" + "os" + "os/exec" + "strconv" + "strings" + "syscall" + + proxyAgent "github.com/hashicorp/consul/agent/proxyprocess" + "github.com/hashicorp/consul/agent/xds" + "github.com/hashicorp/consul/api" + proxyCmd "github.com/hashicorp/consul/command/connect/proxy" + "github.com/hashicorp/consul/command/flags" + + "github.com/mitchellh/cli" +) + +func New(ui cli.Ui) *cmd { + ui = &cli.PrefixedUi{ + OutputPrefix: "==> ", + InfoPrefix: " ", + ErrorPrefix: "==> ", + Ui: ui, + } + + c := &cmd{UI: ui} + c.init() + return c +} + +type cmd struct { + UI cli.Ui + flags *flag.FlagSet + http *flags.HTTPFlags + help string + client *api.Client + + // flags + proxyID string + sidecarFor string + adminBind string + envoyBin string + bootstrap bool + grpcAddr string +} + +func (c *cmd) init() { + c.flags = flag.NewFlagSet("", flag.ContinueOnError) + + c.flags.StringVar(&c.proxyID, "proxy-id", "", + "The proxy's ID on the local agent.") + + c.flags.StringVar(&c.sidecarFor, "sidecar-for", "", + "The ID of a service instance on the local agent that this proxy should "+ + "become a sidecar for. It requires that the proxy service is registered "+ + "with the agent as a connect-proxy with Proxy.DestinationServiceID set "+ + "to this value. If more than one such proxy is registered it will fail.") + + c.flags.StringVar(&c.envoyBin, "envoy-binary", "", + "The full path to the envoy binary to run. By default will just search "+ + "$PATH. Ignored if -bootstrap is used.") + + c.flags.StringVar(&c.adminBind, "admin-bind", "localhost:19000", + "The address:port to start envoy's admin server on. Envoy requires this "+ + "but care must be taked to ensure it's not exposed to untrusted network "+ + "as it has full control over the secrets and config of the proxy.") + + c.flags.BoolVar(&c.bootstrap, "bootstrap", false, + "Generate the bootstrap.yaml but don't exec envoy") + + c.flags.StringVar(&c.grpcAddr, "grpc-addr", "", + "Set the agent's gRPC address and port (in http(s)://host:port format). "+ + "Alternatively, you can specify CONSUL_GRPC_ADDR in ENV.") + + c.http = &flags.HTTPFlags{} + flags.Merge(c.flags, c.http.ClientFlags()) + c.help = flags.Usage(help, c.flags) +} + +func (c *cmd) Run(args []string) int { + if err := c.flags.Parse(args); err != nil { + return 1 + } + passThroughArgs := c.flags.Args() + + // Load the proxy ID and token from env vars if they're set + if c.proxyID == "" { + c.proxyID = os.Getenv(proxyAgent.EnvProxyID) + } + if c.sidecarFor == "" { + c.sidecarFor = os.Getenv(proxyAgent.EnvSidecarFor) + } + if c.grpcAddr == "" { + c.grpcAddr = os.Getenv(api.GRPCAddrEnvName) + } + if c.grpcAddr == "" { + c.UI.Error("Either -grpc-addr or CONSUL_GRPC_ADDR must be specified") + return 1 + } + if c.http.Token() == "" { + // Extra check needed since CONSUL_HTTP_TOKEN has not been consulted yet but + // calling SetToken with empty will force that to override the + if proxyToken := os.Getenv(proxyAgent.EnvProxyToken); proxyToken != "" { + c.http.SetToken(proxyToken) + } + } + + // Setup Consul client + client, err := c.http.APIClient() + if err != nil { + c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + c.client = client + + // See if we need to lookup proxyID + if c.proxyID == "" && c.sidecarFor != "" { + proxyID, err := c.lookupProxyIDForSidecar() + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + c.proxyID = proxyID + } + if c.proxyID == "" { + c.UI.Error("No proxy ID specified. One of -proxy-id or -sidecar-for is " + + "required") + return 1 + } + + // Generate config + bootstrapYaml, err := c.generateConfig() + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + + if c.bootstrap { + // Just output it and we are done + fmt.Println(bootstrapYaml) + return 0 + } + + // Find Envoy binary + binary, err := c.findBinary() + if err != nil { + c.UI.Error("Couldn't find envoy binary: " + err.Error()) + return 1 + } + + // First argument needs to be the executable name. + + // TODO(banks): passing config including an ACL token on command line is jank + // - this is world readable. It's easiest thing for now. Temp files are kinda + // gross in a different way - we can limit to same-user access which is much + // better but we are leaving the ACL secret on disk unencrypted for an + // uncontrolled amount of time and in a location the operator doesn't even + // know about. Envoy doesn't support reading bootstrap from stdin or ENV + envoyArgs := []string{binary, "--config-yaml", bootstrapYaml} + envoyArgs = append(envoyArgs, passThroughArgs...) + + // Exec + err = syscall.Exec(binary, envoyArgs, os.Environ()) + if err != nil { + c.UI.Error("Failed to exec envoy: " + err.Error()) + return 1 + } + return 0 +} + +func (c *cmd) findBinary() (string, error) { + if c.envoyBin != "" { + return c.envoyBin, nil + } + return exec.LookPath("envoy") +} + +// TODO(banks) this method ended up with a few subtleties that should be unit +// tested. +func (c *cmd) generateConfig() (string, error) { + var t = template.Must(template.New("bootstrap").Parse(bootstrapTemplate)) + + httpCfg := api.DefaultConfig() + c.http.MergeOntoConfig(httpCfg) + + // Decide on TLS if the scheme is provided and indicates it, if the HTTP env + // suggests TLS is supported explicitly (CONSUL_HTTP_SSL) or implicitly + // (CONSUL_HTTP_ADDR) is https:// + useTLS := false + if strings.HasPrefix(strings.ToLower(c.grpcAddr), "https://") { + useTLS = true + } else if useSSLEnv := os.Getenv(api.HTTPSSLEnvName); useSSLEnv != "" { + if enabled, err := strconv.ParseBool(useSSLEnv); err != nil { + useTLS = enabled + } + } else if strings.HasPrefix(strings.ToLower(httpCfg.Address), "https://") { + useTLS = true + } + + // We want to allow grpcAddr set as host:port with no scheme but if the host + // is an IP this will fail to parse as a URL with "parse 127.0.0.1:8500: first + // path segment in URL cannot contain colon". On the other hand we also + // support both http(s)://host:port and unix:///path/to/file. + addrPort := strings.TrimPrefix(c.grpcAddr, "http://") + addrPort = strings.TrimPrefix(c.grpcAddr, "https://") + + agentAddr, agentPort, err := net.SplitHostPort(addrPort) + if err != nil { + return "", fmt.Errorf("Invalid Consul HTTP address: %s", err) + } + if agentAddr == "" { + agentAddr = "127.0.0.1" + } + + // We use STATIC for agent which means we need to resolve DNS names like + // `localhost` ourselves. We could use STRICT_DNS or DYNAMIC_DNS with envoy + // but Envoy resolves `localhost` differently to go on macOS at least which + // causes paper cuts like default dev agent (which binds specifically to + // 127.0.0.1) isn't reachable since Envoy resolves localhost to `[::]` and + // can't connect. + agentIP, err := net.ResolveIPAddr("ip", agentAddr) + if err != nil { + return "", fmt.Errorf("Failed to resolve agent address: %s", err) + } + + adminAddr, adminPort, err := net.SplitHostPort(c.adminBind) + if err != nil { + return "", fmt.Errorf("Invalid Consul HTTP address: %s", err) + } + + // Envoy requires IP addresses to bind too when using static so resolve DNS or + // localhost here. + adminBindIP, err := net.ResolveIPAddr("ip", adminAddr) + if err != nil { + return "", fmt.Errorf("Failed to resolve admin bind address: %s", err) + } + + args := templateArgs{ + ProxyCluster: c.proxyID, + ProxyID: c.proxyID, + AgentHTTPAddress: agentIP.String(), + AgentHTTPPort: agentPort, + AgentTLS: useTLS, + AgentCAFile: httpCfg.TLSConfig.CAFile, + AdminBindAddress: adminBindIP.String(), + AdminBindPort: adminPort, + Token: httpCfg.Token, + LocalAgentClusterName: xds.LocalAgentClusterName, + } + + var buf bytes.Buffer + err = t.Execute(&buf, args) + if err != nil { + return "", err + } + return buf.String(), nil +} + +func (c *cmd) lookupProxyIDForSidecar() (string, error) { + return proxyCmd.LookupProxyIDForSidecar(c.client, c.sidecarFor) +} + +func (c *cmd) Synopsis() string { + return synopsis +} + +func (c *cmd) Help() string { + return c.help +} + +const synopsis = "Runs or Configures Envoy as a Connect proxy" +const help = ` +Usage: consul connect envoy [options] + + Generates the bootstrap configuration needed to start an Envoy proxy instance + for use as a Connect sidecar for a particular service instance. By default it + will generate the config and then exec Envoy directly until it exits normally. + + It will search $PATH for the envoy binary but this can be overridden with + -envoy-binary. + + It can instead only generate the bootstrap.yaml based on the current ENV and + arguments using -bootstrap. + + The proxy requires service:write permissions for the service it represents. + The token may be passed via the CLI or the CONSUL_TOKEN environment + variable. + + The example below shows how to start a local proxy as a sidecar to a "web" + service instance. It assumes that the proxy was already registered with it's + Config for example via a sidecar_service block. + + $ consul connect envoy -sidecar-for web + +` diff --git a/command/connect/envoy/envoy_test.go b/command/connect/envoy/envoy_test.go new file mode 100644 index 0000000000..ed870dafb1 --- /dev/null +++ b/command/connect/envoy/envoy_test.go @@ -0,0 +1,13 @@ +package envoy + +import ( + "strings" + "testing" +) + +func TestCatalogCommand_noTabs(t *testing.T) { + t.Parallel() + if strings.ContainsRune(New(nil).Help(), '\t') { + t.Fatal("help has tabs") + } +} diff --git a/command/connect/proxy/proxy.go b/command/connect/proxy/proxy.go index 2b0ddf551e..d76d9193c5 100644 --- a/command/connect/proxy/proxy.go +++ b/command/connect/proxy/proxy.go @@ -212,27 +212,37 @@ func (c *cmd) Run(args []string) int { } func (c *cmd) lookupProxyIDForSidecar(client *api.Client) (string, error) { + return LookupProxyIDForSidecar(client, c.sidecarFor) +} + +// LookupProxyIDForSidecar finds candidate local proxy registrations that are a +// sidcar for the given service. It will return an ID if and only if there is +// exactly one registed connect proxy with `Proxy.DestinationServiceID` set to +// the specified service ID. +// +// This is exported to share it with the connect envoy command. +func LookupProxyIDForSidecar(client *api.Client, sidecarFor string) (string, error) { svcs, err := client.Agent().Services() if err != nil { return "", fmt.Errorf("Failed looking up sidecar proxy info for %s: %s", - c.sidecarFor, err) + sidecarFor, err) } var proxyIDs []string for _, svc := range svcs { if svc.Kind == api.ServiceKindConnectProxy && svc.Proxy != nil && - strings.ToLower(svc.Proxy.DestinationServiceID) == c.sidecarFor { + strings.ToLower(svc.Proxy.DestinationServiceID) == sidecarFor { proxyIDs = append(proxyIDs, svc.ID) } } if len(proxyIDs) == 0 { - return "", fmt.Errorf("No sidecar proxy registereded for %s", c.sidecarFor) + return "", fmt.Errorf("No sidecar proxy registereded for %s", sidecarFor) } if len(proxyIDs) > 1 { return "", fmt.Errorf("More than one sidecar proxy registereded for %s.\n"+ " Start proxy with -proxy-id and one of the following IDs: %s", - c.sidecarFor, strings.Join(proxyIDs, ", ")) + sidecarFor, strings.Join(proxyIDs, ", ")) } return proxyIDs[0], nil } diff --git a/command/connect/proxy/proxy_test.go b/command/connect/proxy/proxy_test.go index f2f470281b..e09d95f3ce 100644 --- a/command/connect/proxy/proxy_test.go +++ b/command/connect/proxy/proxy_test.go @@ -1,6 +1,7 @@ package proxy import ( + "strings" "testing" "time" @@ -184,3 +185,10 @@ func testConfig(t *testing.T, cw proxy.ConfigWatcher) *proxy.Config { return nil // satisfy compiler } } + +func TestCatalogCommand_noTabs(t *testing.T) { + t.Parallel() + if strings.ContainsRune(New(nil, nil).Help(), '\t') { + t.Fatal("help has tabs") + } +} diff --git a/logger/grpc.go b/logger/grpc.go new file mode 100644 index 0000000000..90a490d866 --- /dev/null +++ b/logger/grpc.go @@ -0,0 +1,105 @@ +package logger + +import ( + "fmt" + "log" +) + +// GRPCLogger wrapps a *log.Logger and implements the grpclog.LoggerV2 interface +// allowing gRPC servers to log to the standard Consul logger. +type GRPCLogger struct { + level string + l *log.Logger +} + +// NewGRPCLogger creates a grpclog.LoggerV2 that will output to the supplied +// logger with Severity/Verbosity level appropriate for the given config. +// +// Note that grpclog has Info, Warning, Error, Fatal severity levels AND integer +// verbosity levels for additional info. Verbose logs in glog are always INFO +// severity so we map Info,V0 to INFO, Info,V1 to DEBUG, and Info,V>1 to TRACE. +func NewGRPCLogger(config *Config, logger *log.Logger) *GRPCLogger { + return &GRPCLogger{ + level: config.LogLevel, + l: logger, + } +} + +// Info implements grpclog.LoggerV2 +func (g *GRPCLogger) Info(args ...interface{}) { + args = append([]interface{}{"[INFO] "}, args...) + g.l.Print(args...) +} + +// Infoln implements grpclog.LoggerV2 +func (g *GRPCLogger) Infoln(args ...interface{}) { + g.Info(fmt.Sprintln(args...)) +} + +// Infof implements grpclog.LoggerV2 +func (g *GRPCLogger) Infof(format string, args ...interface{}) { + g.Info(fmt.Sprintf(format, args...)) +} + +// Warning implements grpclog.LoggerV2 +func (g *GRPCLogger) Warning(args ...interface{}) { + args = append([]interface{}{"[WARN] "}, args...) + g.l.Print(args...) +} + +// Warningln implements grpclog.LoggerV2 +func (g *GRPCLogger) Warningln(args ...interface{}) { + g.Warning(fmt.Sprintln(args...)) +} + +// Warningf implements grpclog.LoggerV2 +func (g *GRPCLogger) Warningf(format string, args ...interface{}) { + g.Warning(fmt.Sprintf(format, args...)) +} + +// Error implements grpclog.LoggerV2 +func (g *GRPCLogger) Error(args ...interface{}) { + args = append([]interface{}{"[ERR] "}, args...) + g.l.Print(args...) +} + +// Errorln implements grpclog.LoggerV2 +func (g *GRPCLogger) Errorln(args ...interface{}) { + g.Error(fmt.Sprintln(args...)) +} + +// Errorf implements grpclog.LoggerV2 +func (g *GRPCLogger) Errorf(format string, args ...interface{}) { + g.Error(fmt.Sprintf(format, args...)) +} + +// Fatal implements grpclog.LoggerV2 +func (g *GRPCLogger) Fatal(args ...interface{}) { + args = append([]interface{}{"[ERR] "}, args...) + g.l.Fatal(args...) +} + +// Fatalln implements grpclog.LoggerV2 +func (g *GRPCLogger) Fatalln(args ...interface{}) { + g.Fatal(fmt.Sprintln(args...)) +} + +// Fatalf implements grpclog.LoggerV2 +func (g *GRPCLogger) Fatalf(format string, args ...interface{}) { + g.Fatal(fmt.Sprintf(format, args...)) +} + +// V implements grpclog.LoggerV2 +func (g *GRPCLogger) V(l int) bool { + switch g.level { + case "TRACE": + // Enable ALL the verbosity! + return true + case "DEBUG": + return l < 2 + case "INFO": + return l < 1 + default: + return false + } +} diff --git a/logger/grpc_test.go b/logger/grpc_test.go new file mode 100644 index 0000000000..6b1ab34d9a --- /dev/null +++ b/logger/grpc_test.go @@ -0,0 +1,94 @@ +package logger + +import ( + "bytes" + "fmt" + "log" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "google.golang.org/grpc/grpclog" +) + +func TestGRPCLogger(t *testing.T) { + var out bytes.Buffer + // No flags so we don't have to include date/time in expected output + logger := log.New(&out, "", 0) + grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: "TRACE"}, logger)) + + // All of these should output something + grpclog.Info("Info,") + grpclog.Infoln("Infoln") + grpclog.Infof("Infof: %d\n", 1) + + grpclog.Warning("Warning,") + grpclog.Warningln("Warningln") + grpclog.Warningf("Warningf: %d\n", 1) + + grpclog.Error("Error,") + grpclog.Errorln("Errorln") + grpclog.Errorf("Errorf: %d\n", 1) + + // Fatal tests are hard... assume they are good! + expect := `[INFO] Info, +[INFO] Infoln +[INFO] Infof: 1 +[WARN] Warning, +[WARN] Warningln +[WARN] Warningf: 1 +[ERR] Error, +[ERR] Errorln +[ERR] Errorf: 1 +` + + require.Equal(t, expect, out.String()) +} + +func TestGRPCLogger_V(t *testing.T) { + + tests := []struct { + level string + v int + want bool + }{ + {"ERR", -1, false}, + {"ERR", 0, false}, + {"ERR", 1, false}, + {"ERR", 2, false}, + {"ERR", 3, false}, + {"WARN", -1, false}, + {"WARN", 0, false}, + {"WARN", 1, false}, + {"WARN", 2, false}, + {"WARN", 3, false}, + {"INFO", -1, true}, + {"INFO", 0, true}, + {"INFO", 1, false}, + {"INFO", 2, false}, + {"INFO", 3, false}, + {"DEBUG", -1, true}, + {"DEBUG", 0, true}, + {"DEBUG", 1, true}, + {"DEBUG", 2, false}, + {"DEBUG", 3, false}, + {"TRACE", -1, true}, + {"TRACE", 0, true}, + {"TRACE", 1, true}, + {"TRACE", 2, true}, + {"TRACE", 3, true}, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("%s,%d", tt.level, tt.v), func(t *testing.T) { + var out bytes.Buffer + // No flags so we don't have to include date/time in expected output + logger := log.New(&out, "", 0) + grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: tt.level}, logger)) + + assert.Equal(t, tt.want, grpclog.V(tt.v)) + }) + } + +}