From 3b44be530d4522422b1183bae360afe630d57677 Mon Sep 17 00:00:00 2001 From: wangxinyi7 <121973291+wangxinyi7@users.noreply.github.com> Date: Wed, 31 Jan 2024 12:05:47 -0800 Subject: [PATCH] only forwarding the resource service traffic in client agent to server agent (#20347) * only forwarding the resource service traffic in client agent to server agent --- agent/agent.go | 7 ++++++ agent/consul/server_test.go | 2 +- agent/grpc-external/server.go | 39 ++++++++++++++++++++++++++++++- agent/grpc-external/stats_test.go | 2 +- agent/rpc/peering/service_test.go | 2 +- go.mod | 1 + go.sum | 6 +++++ 7 files changed, 55 insertions(+), 4 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ab10e21da2..9dd1b1cade 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -718,6 +718,7 @@ func (a *Agent) Start(ctx context.Context) error { Time: a.config.GRPCKeepaliveInterval, Timeout: a.config.GRPCKeepaliveTimeout, }, + nil, ) if a.baseDeps.UseV2Resources() { @@ -754,6 +755,11 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("can't start agent: client agents are not supported with v2 resources") } + // the conn is used to connect to the consul server agent + conn, err := a.baseDeps.GRPCConnPool.ClientConn(a.baseDeps.RuntimeConfig.Datacenter) + if err != nil { + return err + } a.externalGRPCServer = external.NewServer( a.logger.Named("grpc.external"), metrics.Default(), @@ -763,6 +769,7 @@ func (a *Agent) Start(ctx context.Context) error { Time: a.config.GRPCKeepaliveInterval, Timeout: a.config.GRPCKeepaliveTimeout, }, + conn, ) client, err := consul.NewClient(consulCfg, a.baseDeps.Deps) diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index c7b51c79fe..1e884e609e 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -353,7 +353,7 @@ func newServerWithDeps(t testutil.TestingTB, c *Config, deps Deps) (*Server, err oldNotify() } } - grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler(), keepalive.ServerParameters{}) + grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler(), keepalive.ServerParameters{}, nil) proxyUpdater := proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{}) srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger, proxyUpdater) if err != nil { diff --git a/agent/grpc-external/server.go b/agent/grpc-external/server.go index 30af8f2e6e..ea26301af3 100644 --- a/agent/grpc-external/server.go +++ b/agent/grpc-external/server.go @@ -4,20 +4,30 @@ package external import ( + "context" + "fmt" + "strings" "time" "github.com/armon/go-metrics" middleware "github.com/grpc-ecosystem/go-grpc-middleware" recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/hashi-derek/grpc-proxy/proxy" + "github.com/hashicorp/go-hclog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "github.com/hashicorp/consul/agent/consul/rate" agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/tlsutil" ) +const FORWARD_SERVICE_NAME_PREFIX = "hashicorp.consul." + var ( metricsLabels = []metrics.Label{{ Name: "server_type", @@ -28,11 +38,12 @@ var ( // NewServer constructs a gRPC server for the external gRPC port, to which // handlers can be registered. func NewServer( - logger agentmiddleware.Logger, + logger hclog.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter rate.RequestLimitsHandler, keepaliveParams keepalive.ServerParameters, + serverConn *grpc.ClientConn, ) *grpc.Server { if metricsObj == nil { metricsObj = metrics.Default() @@ -71,6 +82,11 @@ func NewServer( }), } + // forward FORWARD_SERVICE_NAME_PREFIX services from client agent to server agent + if serverConn != nil { + opts = append(opts, grpc.UnknownServiceHandler(proxy.TransparentHandler(makeDirector(serverConn, logger)))) + } + if tls != nil { // Attach TLS credentials, if provided. tlsCreds := agentmiddleware.NewOptionalTransportCredentials( @@ -80,3 +96,24 @@ func NewServer( } return grpc.NewServer(opts...) } + +func makeDirector(serverConn *grpc.ClientConn, logger hclog.Logger) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + var mdCopy metadata.MD + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + mdCopy = metadata.MD{} + } else { + mdCopy = md.Copy() + } + outCtx := metadata.NewOutgoingContext(ctx, mdCopy) + + logger.Debug("forwarding the request to the consul server", "method", fullMethodName) + // throw unimplemented error if the method is not meant to be forwarded + if !strings.HasPrefix(fullMethodName, FORWARD_SERVICE_NAME_PREFIX) { + return outCtx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName)) + } + + return outCtx, serverConn, nil + } +} diff --git a/agent/grpc-external/stats_test.go b/agent/grpc-external/stats_test.go index 3bd5c777cd..7f59871db1 100644 --- a/agent/grpc-external/stats_test.go +++ b/agent/grpc-external/stats_test.go @@ -28,7 +28,7 @@ import ( func TestServer_EmitsStats(t *testing.T) { sink, metricsObj := testutil.NewFakeSink(t) - srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{}) + srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{}, nil) testservice.RegisterSimpleServer(srv, &testservice.Simple{}) diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 5d78588a34..3d1e63d684 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -1835,7 +1835,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer { conf.ACLResolverSettings.EnterpriseMeta = *conf.AgentEnterpriseMeta() deps := newDefaultDeps(t, conf) - externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{}) + externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{}, nil) server, err := consul.NewServer(conf, deps, externalGRPCServer, nil, deps.Logger, nil) require.NoError(t, err) diff --git a/go.mod b/go.mod index afe31e38c9..b6ae1e7d40 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/hashi-derek/grpc-proxy v0.0.0-20231207191910-191266484d75 github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706 github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69 github.com/hashicorp/consul/api v1.26.1 diff --git a/go.sum b/go.sum index 2b17e3b7da..3d6e755264 100644 --- a/go.sum +++ b/go.sum @@ -443,6 +443,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk= github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= +github.com/hashi-derek/grpc-proxy v0.0.0-20231207191910-191266484d75 h1:V5Uqf7VoWMd6UhNf/5EMA8LMPUm95GYvk2YF5SzT24o= +github.com/hashi-derek/grpc-proxy v0.0.0-20231207191910-191266484d75/go.mod h1:5eEnHfK72jOkp4gC1dI/Q/E9MFNOM/ewE/vql5ijV3g= github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706 h1:1ZEjnveDe20yFa6lSkfdQZm5BR/b271n0MsB5R2L3us= github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706/go.mod h1:1Cs8FlmD1BfSQXJGcFLSV5FuIx1AbJP+EJGdxosoS2g= github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69 h1:wzWurXrxfSyG1PHskIZlfuXlTSCj1Tsyatp9DtaasuY= @@ -1070,6 +1072,7 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= +golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -1170,6 +1173,7 @@ golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1365,6 +1369,7 @@ google.golang.org/genproto v0.0.0-20210222152913-aa3ee6e6a81c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= @@ -1482,6 +1487,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78= k8s.io/api v0.26.2 h1:dM3cinp3PGB6asOySalOZxEG4CZ0IAdJsrYZXE/ovGQ= k8s.io/api v0.26.2/go.mod h1:1kjMQsFE+QHPfskEcVNgL3+Hp88B80uj0QtSOlj8itU=