re org resource type registry (#18133)

This commit is contained in:
wangxinyi7 2023-07-14 18:00:17 -07:00 committed by GitHub
parent 05b665e856
commit e7194787a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 55 additions and 26 deletions

View File

@ -14,6 +14,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -576,6 +578,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor, GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor,
EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c), EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c),
XDSStreamLimiter: limiter.NewSessionLimiter(), XDSStreamLimiter: limiter.NewSessionLimiter(),
Registry: resource.NewRegistry(),
} }
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
) )
@ -29,6 +30,7 @@ type Deps struct {
GRPCConnPool GRPCClientConner GRPCConnPool GRPCClientConner
LeaderForwarder LeaderForwarder LeaderForwarder LeaderForwarder
XDSStreamLimiter *limiter.SessionLimiter XDSStreamLimiter *limiter.SessionLimiter
Registry resource.Registry
// GetNetRPCInterceptorFunc, if not nil, sets the net/rpc rpc.ServerServiceCallInterceptor on // GetNetRPCInterceptorFunc, if not nil, sets the net/rpc rpc.ServerServiceCallInterceptor on
// the server side to record metrics around the RPC requests. If nil, no interceptor is added to // the server side to record metrics around the RPC requests. If nil, no interceptor is added to
// the rpc server. // the rpc server.

View File

@ -19,6 +19,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/consul/internal/resource"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
@ -72,8 +74,6 @@ import (
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo" "github.com/hashicorp/consul/internal/resource/demo"
"github.com/hashicorp/consul/internal/resource/reaper" "github.com/hashicorp/consul/internal/resource/reaper"
raftstorage "github.com/hashicorp/consul/internal/storage/raft" raftstorage "github.com/hashicorp/consul/internal/storage/raft"
@ -439,9 +439,6 @@ type Server struct {
// run by the Server // run by the Server
routineManager *routine.Manager routineManager *routine.Manager
// typeRegistry contains Consul's registered resource types.
typeRegistry resource.Registry
// resourceServiceServer implements the Resource Service. // resourceServiceServer implements the Resource Service.
resourceServiceServer *resourcegrpc.Server resourceServiceServer *resourcegrpc.Server
@ -536,7 +533,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
publisher: flat.EventPublisher, publisher: flat.EventPublisher,
incomingRPCLimiter: incomingRPCLimiter, incomingRPCLimiter: incomingRPCLimiter,
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)), routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
typeRegistry: resource.NewRegistry(),
} }
incomingRPCLimiter.Register(s) incomingRPCLimiter.Register(s)
@ -804,7 +800,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
go s.reportingManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) go s.reportingManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
// Initialize external gRPC server // Initialize external gRPC server
s.setupExternalGRPC(config, logger) s.setupExternalGRPC(config, flat.Registry, logger)
// Initialize internal gRPC server. // Initialize internal gRPC server.
// //
@ -817,7 +813,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return nil, err return nil, err
} }
if err := s.setupInsecureResourceServiceClient(logger); err != nil { if err := s.setupInsecureResourceServiceClient(flat.Registry, logger); err != nil {
return nil, err return nil, err
} }
@ -825,7 +821,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.insecureResourceServiceClient, s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime), logger.Named(logging.ControllerRuntime),
) )
s.registerResources(flat) s.registerControllers(flat)
go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})
go s.trackLeaderChanges() go s.trackLeaderChanges()
@ -876,18 +872,14 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return s, nil return s, nil
} }
func (s *Server) registerResources(deps Deps) { func (s *Server) registerControllers(deps Deps) {
if stringslice.Contains(deps.Experiments, catalogResourceExperimentName) { if stringslice.Contains(deps.Experiments, catalogResourceExperimentName) {
catalog.RegisterTypes(s.typeRegistry)
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
mesh.RegisterTypes(s.typeRegistry)
} }
reaper.RegisterControllers(s.controllerManager) reaper.RegisterControllers(s.controllerManager)
if s.config.DevMode { if s.config.DevMode {
demo.RegisterTypes(s.typeRegistry)
demo.RegisterControllers(s.controllerManager) demo.RegisterControllers(s.controllerManager)
} }
} }
@ -1285,7 +1277,7 @@ func (s *Server) setupRPC() error {
} }
// Initialize and register services on external gRPC server. // Initialize and register services on external gRPC server.
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) { func (s *Server) setupExternalGRPC(config *Config, typeRegistry resource.Registry, logger hclog.Logger) {
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{ s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled, ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) { ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
@ -1351,7 +1343,7 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
s.peerStreamServer.Register(s.externalGRPCServer) s.peerStreamServer.Register(s.externalGRPCServer)
s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{ s.resourceServiceServer = resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry, Registry: typeRegistry,
Backend: s.raftStorageBackend, Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver, ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"), Logger: logger.Named("grpc-api.resource"),
@ -1359,9 +1351,9 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
s.resourceServiceServer.Register(s.externalGRPCServer) s.resourceServiceServer.Register(s.externalGRPCServer)
} }
func (s *Server) setupInsecureResourceServiceClient(logger hclog.Logger) error { func (s *Server) setupInsecureResourceServiceClient(typeRegistry resource.Registry, logger hclog.Logger) error {
server := resourcegrpc.NewServer(resourcegrpc.Config{ server := resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry, Registry: typeRegistry,
Backend: s.raftStorageBackend, Backend: s.raftStorageBackend,
ACLResolver: resolver.DANGER_NO_AUTH{}, ACLResolver: resolver.DANGER_NO_AUTH{},
Logger: logger.Named("grpc-api.resource"), Logger: logger.Named("grpc-api.resource"),

View File

@ -0,0 +1,25 @@
package consul
import (
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/mesh"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo"
)
// NewTypeRegistry returns a registry populated with all supported resource
// types.
//
// Note: the registry includes resource types that may not be suitable for
// production use (e.g. experimental or development resource types) because
// it is used in the CLI, where feature flags and other runtime configuration
// may not be available.
func NewTypeRegistry() resource.Registry {
registry := resource.NewRegistry()
demo.RegisterTypes(registry)
mesh.RegisterTypes(registry)
catalog.RegisterTypes(registry)
return registry
}

View File

@ -15,6 +15,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/internal/resource"
"github.com/google/tcpproxy" "github.com/google/tcpproxy"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
@ -1950,6 +1952,7 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps {
NewRequestRecorderFunc: middleware.NewRequestRecorder, NewRequestRecorderFunc: middleware.NewRequestRecorder,
GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor, GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor,
XDSStreamLimiter: limiter.NewSessionLimiter(), XDSStreamLimiter: limiter.NewSessionLimiter(),
Registry: resource.NewRegistry(),
} }
} }

View File

@ -260,6 +260,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
d.XDSStreamLimiter = limiter.NewSessionLimiter() d.XDSStreamLimiter = limiter.NewSessionLimiter()
d.Registry = consul.NewTypeRegistry()
return d, nil return d, nil
} }

View File

@ -61,8 +61,10 @@ func RegisterTypes(r resource.Registry) {
} }
``` ```
Update the `registerResources` method in [`server.go`] to call your package's Update the `NewTypeRegistry` method in [`type_registry.go`] to call your
type registration method: package's type registration method:
[`type_registry.go`]: ../../agent/consul/type_registry.go
```Go ```Go
import ( import (
@ -71,15 +73,13 @@ import (
// … // …
) )
func (s *Server) registerResources() { func NewTypeRegistry() resource.Registry {
// … // …
foo.RegisterTypes(s.typeRegistry) foo.RegisterTypes(registry)
// … // …
} }
``` ```
[`server.go`]: ../../agent/consul/server.go
That should be all you need to start using your new resource type. Test it out That should be all you need to start using your new resource type. Test it out
by starting an agent in dev mode: by starting an agent in dev mode:
@ -277,7 +277,9 @@ func (barReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
Next, register your controller with the controller manager. Another common Next, register your controller with the controller manager. Another common
pattern is to have your package expose a method for registering controllers, pattern is to have your package expose a method for registering controllers,
which is also called from `registerResources` in [`server.go`]. which is called from `registerControllers` in [`server.go`].
[`server.go`]: ../../agent/consul/server.go
```Go ```Go
package foo package foo
@ -290,7 +292,7 @@ func RegisterControllers(mgr *controller.Manager) {
```Go ```Go
package consul package consul
func (s *Server) registerResources() { func (s *Server) registerControllers() {
// … // …
foo.RegisterControllers(s.controllerManager) foo.RegisterControllers(s.controllerManager)
// … // …