NET-4657/add resource service client (#18053)

### Description

<!-- Please describe why you're making this change, in plain English.
-->
Dan had already started on this
[task](https://github.com/hashicorp/consul/pull/17849) which is needed
to start building the HTTP APIs. This just needed some cleanup to get it
ready for review.

Overview:

- Rename `internalResourceServiceClient` to
`insecureResourceServiceClient` for name consistency
- Configure a `secureResourceServiceClient` with auth enabled

### PR Checklist

* [ ] ~updated test coverage~
* [ ] ~external facing docs updated~
* [x] appropriate backport labels added
* [ ] ~not a security concern~
This commit is contained in:
Poonam Jadhav 2023-07-14 14:09:02 -04:00 committed by GitHub
parent ad6364af9e
commit 5208ea90e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 16 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
@ -163,6 +164,9 @@ func (a *TestACLAgent) Stats() map[string]map[string]string {
func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error { func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error {
return fmt.Errorf("Unimplemented") return fmt.Errorf("Unimplemented")
} }
func (a *TestACLAgent) ResourceServiceClient() pbresource.ResourceServiceClient {
return nil
}
func TestACL_Version8EnabledByDefault(t *testing.T) { func TestACL_Version8EnabledByDefault(t *testing.T) {
t.Parallel() t.Parallel()

View File

@ -71,6 +71,7 @@ import (
"github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pboperator" "github.com/hashicorp/consul/proto/private/pboperator"
"github.com/hashicorp/consul/proto/private/pbpeering" "github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
@ -198,6 +199,9 @@ type delegate interface {
RPC(ctx context.Context, method string, args interface{}, reply interface{}) error RPC(ctx context.Context, method string, args interface{}, reply interface{}) error
// ResourceServiceClient is a client for the gRPC Resource Service.
ResourceServiceClient() pbresource.ResourceServiceClient
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error Shutdown() error
Stats() map[string]map[string]string Stats() map[string]map[string]string

View File

@ -25,6 +25,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
@ -93,6 +94,9 @@ type Client struct {
EnterpriseClient EnterpriseClient
tlsConfigurator *tlsutil.Configurator tlsConfigurator *tlsutil.Configurator
// resourceServiceClient is a client for the gRPC Resource Service.
resourceServiceClient pbresource.ResourceServiceClient
} }
// NewClient creates and returns a Client // NewClient creates and returns a Client
@ -151,6 +155,13 @@ func NewClient(config *Config, deps Deps) (*Client, error) {
} }
c.router = deps.Router c.router = deps.Router
conn, err := deps.GRPCConnPool.ClientConn(deps.ConnPool.Datacenter)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to get gRPC client connection: %w", err)
}
c.resourceServiceClient = pbresource.NewResourceServiceClient(conn)
// Start LAN event handlers after the router is complete since the event // Start LAN event handlers after the router is complete since the event
// handlers depend on the router and the router depends on Serf. // handlers depend on the router and the router depends on Serf.
go c.lanEventHandler() go c.lanEventHandler()
@ -451,3 +462,7 @@ func (c *Client) AgentEnterpriseMeta() *acl.EnterpriseMeta {
func (c *Client) agentSegmentName() string { func (c *Client) agentSegmentName() string {
return c.config.Segment return c.config.Segment
} }
func (c *Client) ResourceServiceClient() pbresource.ResourceServiceClient {
return c.resourceServiceClient
}

View File

@ -442,10 +442,20 @@ type Server struct {
// typeRegistry contains Consul's registered resource types. // typeRegistry contains Consul's registered resource types.
typeRegistry resource.Registry typeRegistry resource.Registry
// internalResourceServiceClient is a client that can be used to communicate // resourceServiceServer implements the Resource Service.
// with the Resource Service in-process (i.e. not via the network) without auth. resourceServiceServer *resourcegrpc.Server
// It should only be used for purely-internal workloads, such as controllers.
internalResourceServiceClient pbresource.ResourceServiceClient // insecureResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) *without*
// auth. It should only be used for purely-internal workloads, such as
// controllers.
insecureResourceServiceClient pbresource.ResourceServiceClient
// secureResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) *with* auth.
// It can be used to make requests to the Resource Service on behalf of the user
// (e.g. from the HTTP API).
secureResourceServiceClient pbresource.ResourceServiceClient
// controllerManager schedules the execution of controllers. // controllerManager schedules the execution of controllers.
controllerManager *controller.Manager controllerManager *controller.Manager
@ -803,11 +813,16 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s) s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder s.grpcLeaderForwarder = flat.LeaderForwarder
if err := s.setupInternalResourceService(logger); err != nil { if err := s.setupSecureResourceServiceClient(); err != nil {
return nil, err return nil, err
} }
if err := s.setupInsecureResourceServiceClient(logger); err != nil {
return nil, err
}
s.controllerManager = controller.NewManager( s.controllerManager = controller.NewManager(
s.internalResourceServiceClient, s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime), logger.Named(logging.ControllerRuntime),
) )
s.registerResources(flat) s.registerResources(flat)
@ -929,6 +944,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
s.peerStreamServer.Register(srv) s.peerStreamServer.Register(srv)
s.externalACLServer.Register(srv) s.externalACLServer.Register(srv)
s.externalConnectCAServer.Register(srv) s.externalConnectCAServer.Register(srv)
s.resourceServiceServer.Register(srv)
} }
return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil, s.incomingRPCLimiter) return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil, s.incomingRPCLimiter)
@ -1334,23 +1350,50 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
}) })
s.peerStreamServer.Register(s.externalGRPCServer) s.peerStreamServer.Register(s.externalGRPCServer)
resourcegrpc.NewServer(resourcegrpc.Config{ s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry, Registry: s.typeRegistry,
Backend: s.raftStorageBackend, Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver, ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"), Logger: logger.Named("grpc-api.resource"),
}).Register(s.externalGRPCServer) })
s.resourceServiceServer.Register(s.externalGRPCServer)
} }
func (s *Server) setupInternalResourceService(logger hclog.Logger) error { func (s *Server) setupInsecureResourceServiceClient(logger hclog.Logger) error {
server := grpc.NewServer() server := resourcegrpc.NewServer(resourcegrpc.Config{
resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry, Registry: s.typeRegistry,
Backend: s.raftStorageBackend, Backend: s.raftStorageBackend,
ACLResolver: resolver.DANGER_NO_AUTH{}, ACLResolver: resolver.DANGER_NO_AUTH{},
Logger: logger.Named("grpc-api.resource"), Logger: logger.Named("grpc-api.resource"),
}).Register(server) })
conn, err := s.runInProcessGRPCServer(server.Register)
if err != nil {
return err
}
s.insecureResourceServiceClient = pbresource.NewResourceServiceClient(conn)
return nil
}
func (s *Server) setupSecureResourceServiceClient() error {
conn, err := s.runInProcessGRPCServer(s.resourceServiceServer.Register)
if err != nil {
return err
}
s.secureResourceServiceClient = pbresource.NewResourceServiceClient(conn)
return nil
}
// runInProcessGRPCServer runs a gRPC server that can only be accessed in the
// same process, rather than over the network, using a pipe listener.
func (s *Server) runInProcessGRPCServer(registerFn ...func(*grpc.Server)) (*grpc.ClientConn, error) {
server := grpc.NewServer()
for _, fn := range registerFn {
fn(server)
}
pipe := agentgrpc.NewPipeListener() pipe := agentgrpc.NewPipeListener()
go server.Serve(pipe) go server.Serve(pipe)
@ -1367,15 +1410,14 @@ func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
) )
if err != nil { if err != nil {
server.Stop() server.Stop()
return err return nil, err
} }
go func() { go func() {
<-s.shutdownCh <-s.shutdownCh
conn.Close() conn.Close()
}() }()
s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn)
return nil return conn, nil
} }
// Shutdown is used to shutdown the server // Shutdown is used to shutdown the server
@ -2095,6 +2137,10 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback {
} }
} }
func (s *Server) ResourceServiceClient() pbresource.ResourceServiceClient {
return s.secureResourceServiceClient
}
func fileExists(name string) (bool, error) { func fileExists(name string) (bool, error) {
_, err := os.Stat(name) _, err := os.Stat(name)
if err == nil { if err == nil {