mirror of https://github.com/status-im/consul.git
only forwarding the resource service traffic in client agent to server agent (#20347)
* only forwarding the resource service traffic in client agent to server agent
This commit is contained in:
parent
383d92e9ab
commit
3b44be530d
|
@ -718,6 +718,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
Time: a.config.GRPCKeepaliveInterval,
|
||||
Timeout: a.config.GRPCKeepaliveTimeout,
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
if a.baseDeps.UseV2Resources() {
|
||||
|
@ -754,6 +755,11 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
return fmt.Errorf("can't start agent: client agents are not supported with v2 resources")
|
||||
}
|
||||
|
||||
// the conn is used to connect to the consul server agent
|
||||
conn, err := a.baseDeps.GRPCConnPool.ClientConn(a.baseDeps.RuntimeConfig.Datacenter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.externalGRPCServer = external.NewServer(
|
||||
a.logger.Named("grpc.external"),
|
||||
metrics.Default(),
|
||||
|
@ -763,6 +769,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
Time: a.config.GRPCKeepaliveInterval,
|
||||
Timeout: a.config.GRPCKeepaliveTimeout,
|
||||
},
|
||||
conn,
|
||||
)
|
||||
|
||||
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
|
||||
|
|
|
@ -353,7 +353,7 @@ func newServerWithDeps(t testutil.TestingTB, c *Config, deps Deps) (*Server, err
|
|||
oldNotify()
|
||||
}
|
||||
}
|
||||
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler(), keepalive.ServerParameters{})
|
||||
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler(), keepalive.ServerParameters{}, nil)
|
||||
proxyUpdater := proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{})
|
||||
srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger, proxyUpdater)
|
||||
if err != nil {
|
||||
|
|
|
@ -4,20 +4,30 @@
|
|||
package external
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
||||
"github.com/hashi-derek/grpc-proxy/proxy"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/rate"
|
||||
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
const FORWARD_SERVICE_NAME_PREFIX = "hashicorp.consul."
|
||||
|
||||
var (
|
||||
metricsLabels = []metrics.Label{{
|
||||
Name: "server_type",
|
||||
|
@ -28,11 +38,12 @@ var (
|
|||
// NewServer constructs a gRPC server for the external gRPC port, to which
|
||||
// handlers can be registered.
|
||||
func NewServer(
|
||||
logger agentmiddleware.Logger,
|
||||
logger hclog.Logger,
|
||||
metricsObj *metrics.Metrics,
|
||||
tls *tlsutil.Configurator,
|
||||
limiter rate.RequestLimitsHandler,
|
||||
keepaliveParams keepalive.ServerParameters,
|
||||
serverConn *grpc.ClientConn,
|
||||
) *grpc.Server {
|
||||
if metricsObj == nil {
|
||||
metricsObj = metrics.Default()
|
||||
|
@ -71,6 +82,11 @@ func NewServer(
|
|||
}),
|
||||
}
|
||||
|
||||
// forward FORWARD_SERVICE_NAME_PREFIX services from client agent to server agent
|
||||
if serverConn != nil {
|
||||
opts = append(opts, grpc.UnknownServiceHandler(proxy.TransparentHandler(makeDirector(serverConn, logger))))
|
||||
}
|
||||
|
||||
if tls != nil {
|
||||
// Attach TLS credentials, if provided.
|
||||
tlsCreds := agentmiddleware.NewOptionalTransportCredentials(
|
||||
|
@ -80,3 +96,24 @@ func NewServer(
|
|||
}
|
||||
return grpc.NewServer(opts...)
|
||||
}
|
||||
|
||||
func makeDirector(serverConn *grpc.ClientConn, logger hclog.Logger) func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
|
||||
return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
|
||||
var mdCopy metadata.MD
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
mdCopy = metadata.MD{}
|
||||
} else {
|
||||
mdCopy = md.Copy()
|
||||
}
|
||||
outCtx := metadata.NewOutgoingContext(ctx, mdCopy)
|
||||
|
||||
logger.Debug("forwarding the request to the consul server", "method", fullMethodName)
|
||||
// throw unimplemented error if the method is not meant to be forwarded
|
||||
if !strings.HasPrefix(fullMethodName, FORWARD_SERVICE_NAME_PREFIX) {
|
||||
return outCtx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName))
|
||||
}
|
||||
|
||||
return outCtx, serverConn, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
func TestServer_EmitsStats(t *testing.T) {
|
||||
sink, metricsObj := testutil.NewFakeSink(t)
|
||||
|
||||
srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{})
|
||||
srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{}, nil)
|
||||
|
||||
testservice.RegisterSimpleServer(srv, &testservice.Simple{})
|
||||
|
||||
|
|
|
@ -1835,7 +1835,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
|
|||
conf.ACLResolverSettings.EnterpriseMeta = *conf.AgentEnterpriseMeta()
|
||||
|
||||
deps := newDefaultDeps(t, conf)
|
||||
externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{})
|
||||
externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler(), keepalive.ServerParameters{}, nil)
|
||||
|
||||
server, err := consul.NewServer(conf, deps, externalGRPCServer, nil, deps.Logger, nil)
|
||||
require.NoError(t, err)
|
||||
|
|
1
go.mod
1
go.mod
|
@ -37,6 +37,7 @@ require (
|
|||
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1
|
||||
github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/hashi-derek/grpc-proxy v0.0.0-20231207191910-191266484d75
|
||||
github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706
|
||||
github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69
|
||||
github.com/hashicorp/consul/api v1.26.1
|
||||
|
|
6
go.sum
6
go.sum
|
@ -443,6 +443,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb
|
|||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
|
||||
github.com/hashi-derek/grpc-proxy v0.0.0-20231207191910-191266484d75 h1:V5Uqf7VoWMd6UhNf/5EMA8LMPUm95GYvk2YF5SzT24o=
|
||||
github.com/hashi-derek/grpc-proxy v0.0.0-20231207191910-191266484d75/go.mod h1:5eEnHfK72jOkp4gC1dI/Q/E9MFNOM/ewE/vql5ijV3g=
|
||||
github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706 h1:1ZEjnveDe20yFa6lSkfdQZm5BR/b271n0MsB5R2L3us=
|
||||
github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706/go.mod h1:1Cs8FlmD1BfSQXJGcFLSV5FuIx1AbJP+EJGdxosoS2g=
|
||||
github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69 h1:wzWurXrxfSyG1PHskIZlfuXlTSCj1Tsyatp9DtaasuY=
|
||||
|
@ -1070,6 +1072,7 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v
|
|||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
|
||||
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
|
||||
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
|
@ -1170,6 +1173,7 @@ golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
@ -1365,6 +1369,7 @@ google.golang.org/genproto v0.0.0-20210222152913-aa3ee6e6a81c/go.mod h1:FWY/as6D
|
|||
google.golang.org/genproto v0.0.0-20210303154014-9728d6b83eeb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
|
||||
google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
|
||||
google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
|
||||
google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
|
||||
google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
|
||||
google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A=
|
||||
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
|
||||
|
@ -1482,6 +1487,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
|
||||
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
|
||||
k8s.io/api v0.26.2 h1:dM3cinp3PGB6asOySalOZxEG4CZ0IAdJsrYZXE/ovGQ=
|
||||
k8s.io/api v0.26.2/go.mod h1:1kjMQsFE+QHPfskEcVNgL3+Hp88B80uj0QtSOlj8itU=
|
||||
|
|
Loading…
Reference in New Issue