From 5208ea90e41c8ee91b4b47d42ef9381b6c55f253 Mon Sep 17 00:00:00 2001 From: Poonam Jadhav Date: Fri, 14 Jul 2023 14:09:02 -0400 Subject: [PATCH] NET-4657/add resource service client (#18053) ### Description Dan had already started on this [task](https://github.com/hashicorp/consul/pull/17849) which is needed to start building the HTTP APIs. This just needed some cleanup to get it ready for review. Overview: - Rename `internalResourceServiceClient` to `insecureResourceServiceClient` for name consistency - Configure a `secureResourceServiceClient` with auth enabled ### PR Checklist * [ ] ~updated test coverage~ * [ ] ~external facing docs updated~ * [x] appropriate backport labels added * [ ] ~not a security concern~ --- agent/acl_test.go | 4 +++ agent/agent.go | 4 +++ agent/consul/client.go | 15 ++++++++ agent/consul/server.go | 78 +++++++++++++++++++++++++++++++++--------- 4 files changed, 85 insertions(+), 16 deletions(-) diff --git a/agent/acl_test.go b/agent/acl_test.go index 40662231ac..5e5969dd64 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/types" @@ -163,6 +164,9 @@ func (a *TestACLAgent) Stats() map[string]map[string]string { func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error { return fmt.Errorf("Unimplemented") } +func (a *TestACLAgent) ResourceServiceClient() pbresource.ResourceServiceClient { + return nil +} func TestACL_Version8EnabledByDefault(t *testing.T) { t.Parallel() diff --git a/agent/agent.go b/agent/agent.go index 881b94209d..ef65592352 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -71,6 +71,7 @@ import ( "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/pboperator" "github.com/hashicorp/consul/proto/private/pbpeering" "github.com/hashicorp/consul/tlsutil" @@ -198,6 +199,9 @@ type delegate interface { RPC(ctx context.Context, method string, args interface{}, reply interface{}) error + // ResourceServiceClient is a client for the gRPC Resource Service. + ResourceServiceClient() pbresource.ResourceServiceClient + SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error Shutdown() error Stats() map[string]map[string]string diff --git a/agent/consul/client.go b/agent/consul/client.go index e4a3f83324..256e0e58e3 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" ) @@ -93,6 +94,9 @@ type Client struct { EnterpriseClient tlsConfigurator *tlsutil.Configurator + + // resourceServiceClient is a client for the gRPC Resource Service. + resourceServiceClient pbresource.ResourceServiceClient } // NewClient creates and returns a Client @@ -151,6 +155,13 @@ func NewClient(config *Config, deps Deps) (*Client, error) { } c.router = deps.Router + conn, err := deps.GRPCConnPool.ClientConn(deps.ConnPool.Datacenter) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("Failed to get gRPC client connection: %w", err) + } + c.resourceServiceClient = pbresource.NewResourceServiceClient(conn) + // Start LAN event handlers after the router is complete since the event // handlers depend on the router and the router depends on Serf. go c.lanEventHandler() @@ -451,3 +462,7 @@ func (c *Client) AgentEnterpriseMeta() *acl.EnterpriseMeta { func (c *Client) agentSegmentName() string { return c.config.Segment } + +func (c *Client) ResourceServiceClient() pbresource.ResourceServiceClient { + return c.resourceServiceClient +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 6bb424c675..2cfe9cb0aa 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -442,10 +442,20 @@ type Server struct { // typeRegistry contains Consul's registered resource types. typeRegistry resource.Registry - // internalResourceServiceClient is a client that can be used to communicate - // with the Resource Service in-process (i.e. not via the network) without auth. - // It should only be used for purely-internal workloads, such as controllers. - internalResourceServiceClient pbresource.ResourceServiceClient + // resourceServiceServer implements the Resource Service. + resourceServiceServer *resourcegrpc.Server + + // insecureResourceServiceClient is a client that can be used to communicate + // with the Resource Service in-process (i.e. not via the network) *without* + // auth. It should only be used for purely-internal workloads, such as + // controllers. + insecureResourceServiceClient pbresource.ResourceServiceClient + + // secureResourceServiceClient is a client that can be used to communicate + // with the Resource Service in-process (i.e. not via the network) *with* auth. + // It can be used to make requests to the Resource Service on behalf of the user + // (e.g. from the HTTP API). + secureResourceServiceClient pbresource.ResourceServiceClient // controllerManager schedules the execution of controllers. controllerManager *controller.Manager @@ -803,11 +813,16 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s) s.grpcLeaderForwarder = flat.LeaderForwarder - if err := s.setupInternalResourceService(logger); err != nil { + if err := s.setupSecureResourceServiceClient(); err != nil { return nil, err } + + if err := s.setupInsecureResourceServiceClient(logger); err != nil { + return nil, err + } + s.controllerManager = controller.NewManager( - s.internalResourceServiceClient, + s.insecureResourceServiceClient, logger.Named(logging.ControllerRuntime), ) s.registerResources(flat) @@ -929,6 +944,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler s.peerStreamServer.Register(srv) s.externalACLServer.Register(srv) s.externalConnectCAServer.Register(srv) + s.resourceServiceServer.Register(srv) } return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil, s.incomingRPCLimiter) @@ -1334,23 +1350,50 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) { }) s.peerStreamServer.Register(s.externalGRPCServer) - resourcegrpc.NewServer(resourcegrpc.Config{ + s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{ Registry: s.typeRegistry, Backend: s.raftStorageBackend, ACLResolver: s.ACLResolver, Logger: logger.Named("grpc-api.resource"), - }).Register(s.externalGRPCServer) + }) + s.resourceServiceServer.Register(s.externalGRPCServer) } -func (s *Server) setupInternalResourceService(logger hclog.Logger) error { - server := grpc.NewServer() - - resourcegrpc.NewServer(resourcegrpc.Config{ +func (s *Server) setupInsecureResourceServiceClient(logger hclog.Logger) error { + server := resourcegrpc.NewServer(resourcegrpc.Config{ Registry: s.typeRegistry, Backend: s.raftStorageBackend, ACLResolver: resolver.DANGER_NO_AUTH{}, Logger: logger.Named("grpc-api.resource"), - }).Register(server) + }) + + conn, err := s.runInProcessGRPCServer(server.Register) + if err != nil { + return err + } + s.insecureResourceServiceClient = pbresource.NewResourceServiceClient(conn) + + return nil +} + +func (s *Server) setupSecureResourceServiceClient() error { + conn, err := s.runInProcessGRPCServer(s.resourceServiceServer.Register) + if err != nil { + return err + } + s.secureResourceServiceClient = pbresource.NewResourceServiceClient(conn) + + return nil +} + +// runInProcessGRPCServer runs a gRPC server that can only be accessed in the +// same process, rather than over the network, using a pipe listener. +func (s *Server) runInProcessGRPCServer(registerFn ...func(*grpc.Server)) (*grpc.ClientConn, error) { + server := grpc.NewServer() + + for _, fn := range registerFn { + fn(server) + } pipe := agentgrpc.NewPipeListener() go server.Serve(pipe) @@ -1367,15 +1410,14 @@ func (s *Server) setupInternalResourceService(logger hclog.Logger) error { ) if err != nil { server.Stop() - return err + return nil, err } go func() { <-s.shutdownCh conn.Close() }() - s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn) - return nil + return conn, nil } // Shutdown is used to shutdown the server @@ -2095,6 +2137,10 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback { } } +func (s *Server) ResourceServiceClient() pbresource.ResourceServiceClient { + return s.secureResourceServiceClient +} + func fileExists(name string) (bool, error) { _, err := os.Stat(name) if err == nil {