mirror of https://github.com/status-im/consul.git
Merge pull request #10895 from bigmikes/serve-panic-recovery
grpc, xds: recovery middleware to return and log error in case of panic
This commit is contained in:
commit
d9dd6944f5
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
grpc, xds: improved reliability of grpc and xds servers by adding recovery-middleware to return and log error in case of panic.
|
||||
```
|
|
@ -654,7 +654,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
|
|||
s.registerEnterpriseGRPCServices(deps, srv)
|
||||
}
|
||||
|
||||
return agentgrpc.NewHandler(config.RPCAddr, register)
|
||||
return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register)
|
||||
}
|
||||
|
||||
func (s *Server) connectCARootsMonitor(ctx context.Context) {
|
||||
|
|
|
@ -151,7 +151,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
|
|||
}, hclog.New(nil))
|
||||
require.NoError(t, err)
|
||||
|
||||
srv := newTestServer(t, "server-1", "dc1", tlsConf)
|
||||
srv := newSimpleTestServer(t, "server-1", "dc1", tlsConf)
|
||||
|
||||
md := srv.Metadata()
|
||||
res.AddServer(types.AreaWAN, md)
|
||||
|
@ -199,7 +199,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
|
|||
}, hclog.New(nil))
|
||||
require.NoError(t, err)
|
||||
|
||||
srv := newTestServer(t, "bob", "dc1", tlsConf)
|
||||
srv := newSimpleTestServer(t, "bob", "dc1", tlsConf)
|
||||
|
||||
// Send all of the traffic to dc1's server
|
||||
var p tcpproxy.Proxy
|
||||
|
@ -266,7 +266,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
|
|||
|
||||
for i := 0; i < count; i++ {
|
||||
name := fmt.Sprintf("server-%d", i)
|
||||
srv := newTestServer(t, name, "dc1", nil)
|
||||
srv := newSimpleTestServer(t, name, "dc1", nil)
|
||||
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||
t.Cleanup(srv.shutdown)
|
||||
}
|
||||
|
@ -302,7 +302,7 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
|
|||
var servers []testServer
|
||||
for i := 0; i < count; i++ {
|
||||
name := fmt.Sprintf("server-%d", i)
|
||||
srv := newTestServer(t, name, "dc1", nil)
|
||||
srv := newSimpleTestServer(t, name, "dc1", nil)
|
||||
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||
servers = append(servers, srv)
|
||||
t.Cleanup(srv.shutdown)
|
||||
|
@ -352,7 +352,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
|
|||
|
||||
for i := 0; i < count; i++ {
|
||||
name := fmt.Sprintf("server-%d", i)
|
||||
srv := newTestServer(t, name, "dc1", nil)
|
||||
srv := newSimpleTestServer(t, name, "dc1", nil)
|
||||
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||
t.Cleanup(srv.shutdown)
|
||||
}
|
||||
|
@ -406,7 +406,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
|
|||
|
||||
for _, dc := range dcs {
|
||||
name := "server-0-" + dc
|
||||
srv := newTestServer(t, name, dc, nil)
|
||||
srv := newSimpleTestServer(t, name, dc, nil)
|
||||
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||
t.Cleanup(srv.shutdown)
|
||||
}
|
||||
|
|
|
@ -9,29 +9,73 @@ import (
|
|||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
|
||||
// The register function will be called with the grpc.Server to register
|
||||
// gRPC services with the server.
|
||||
func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
|
||||
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)) *Handler {
|
||||
metrics := defaultMetrics()
|
||||
|
||||
// We don't need to pass tls.Config to the server since it's multiplexed
|
||||
// behind the RPC listener, which already has TLS configured.
|
||||
srv := grpc.NewServer(
|
||||
recoveryOpts := PanicHandlerMiddlewareOpts(logger)
|
||||
|
||||
opts := []grpc.ServerOption{
|
||||
grpc.StatsHandler(newStatsHandler(metrics)),
|
||||
grpc.StreamInterceptor((&activeStreamCounter{metrics: metrics}).Intercept),
|
||||
middleware.WithUnaryServerChain(
|
||||
// Add middlware interceptors to recover in case of panics.
|
||||
recovery.UnaryServerInterceptor(recoveryOpts...),
|
||||
),
|
||||
middleware.WithStreamServerChain(
|
||||
// Add middlware interceptors to recover in case of panics.
|
||||
recovery.StreamServerInterceptor(recoveryOpts...),
|
||||
(&activeStreamCounter{metrics: metrics}).Intercept,
|
||||
),
|
||||
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
||||
MinTime: 15 * time.Second,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
// We don't need to pass tls.Config to the server since it's multiplexed
|
||||
// behind the RPC listener, which already has TLS configured.
|
||||
srv := grpc.NewServer(opts...)
|
||||
register(srv)
|
||||
|
||||
lis := &chanListener{addr: addr, conns: make(chan net.Conn), done: make(chan struct{})}
|
||||
return &Handler{srv: srv, listener: lis}
|
||||
}
|
||||
|
||||
// PanicHandlerMiddlewareOpts returns the []recovery.Option containing
|
||||
// recovery handler function.
|
||||
func PanicHandlerMiddlewareOpts(logger Logger) []recovery.Option {
|
||||
return []recovery.Option{
|
||||
recovery.WithRecoveryHandler(NewPanicHandler(logger)),
|
||||
}
|
||||
}
|
||||
|
||||
// NewPanicHandler returns a recovery.RecoveryHandlerFunc closure function
|
||||
// to handle panic in GRPC server's handlers.
|
||||
func NewPanicHandler(logger Logger) recovery.RecoveryHandlerFunc {
|
||||
return func(p interface{}) (err error) {
|
||||
// Log the panic and the stack trace of the Goroutine that caused the panic.
|
||||
stacktrace := hclog.Stacktrace()
|
||||
logger.Error("panic serving grpc request",
|
||||
"panic", p,
|
||||
"stack", stacktrace,
|
||||
)
|
||||
|
||||
return status.Errorf(codes.Internal, "grpc: panic serving request")
|
||||
}
|
||||
}
|
||||
|
||||
// Handler implements a handler for the rpc server listener, and the
|
||||
// agent.Component interface for managing the lifecycle of the grpc.Server.
|
||||
type Handler struct {
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
|
||||
"github.com/hashicorp/consul/agent/grpc/resolver"
|
||||
)
|
||||
|
||||
func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
|
||||
// Prepare a logger with output to a buffer
|
||||
// so we can check what it writes.
|
||||
var buf bytes.Buffer
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Output: &buf,
|
||||
})
|
||||
|
||||
res := resolver.NewServerResolverBuilder(newConfig(t))
|
||||
registerWithGRPC(t, res)
|
||||
|
||||
srv := newPanicTestServer(t, logger, "server-1", "dc1", nil)
|
||||
res.AddServer(types.AreaWAN, srv.Metadata())
|
||||
t.Cleanup(srv.shutdown)
|
||||
|
||||
pool := NewClientConnPool(ClientConnPoolConfig{
|
||||
Servers: res,
|
||||
UseTLSForDC: useTLSForDcAlwaysTrue,
|
||||
DialingFromServer: true,
|
||||
DialingFromDatacenter: "dc1",
|
||||
})
|
||||
|
||||
conn, err := pool.ClientConn("dc1")
|
||||
require.NoError(t, err)
|
||||
client := testservice.NewSimpleClient(conn)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
resp, err := client.Something(ctx, &testservice.Req{})
|
||||
expectedErr := status.Errorf(codes.Internal, "grpc: panic serving request")
|
||||
require.Equal(t, expectedErr, err)
|
||||
require.Nil(t, resp)
|
||||
|
||||
// Read the log
|
||||
strLog := buf.String()
|
||||
// Checking the entire stack trace is not possible, let's
|
||||
// make sure that it contains a couple of expected strings.
|
||||
require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`)
|
||||
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc.(*simplePanic).Something`)
|
||||
}
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
|
@ -39,11 +40,22 @@ func (s testServer) Metadata() *metadata.Server {
|
|||
}
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T, name string, dc string, tlsConf *tlsutil.Configurator) testServer {
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
handler := NewHandler(addr, func(server *grpc.Server) {
|
||||
func newSimpleTestServer(t *testing.T, name, dc string, tlsConf *tlsutil.Configurator) testServer {
|
||||
return newTestServer(t, hclog.Default(), name, dc, tlsConf, func(server *grpc.Server) {
|
||||
testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc})
|
||||
})
|
||||
}
|
||||
|
||||
// newPanicTestServer sets up a simple server with handlers that panic.
|
||||
func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator) testServer {
|
||||
return newTestServer(t, logger, name, dc, tlsConf, func(server *grpc.Server) {
|
||||
testservice.RegisterSimpleServer(server, &simplePanic{name: name, dc: dc})
|
||||
})
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator, register func(server *grpc.Server)) testServer {
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
handler := NewHandler(logger, addr, register)
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
@ -103,6 +115,23 @@ func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.
|
|||
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
|
||||
}
|
||||
|
||||
type simplePanic struct {
|
||||
name, dc string
|
||||
}
|
||||
|
||||
func (s *simplePanic) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
|
||||
for flow.Context().Err() == nil {
|
||||
time.Sleep(time.Millisecond)
|
||||
panic("panic from Flow")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *simplePanic) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
|
||||
time.Sleep(time.Millisecond)
|
||||
panic("panic from Something")
|
||||
}
|
||||
|
||||
// fakeRPCListener mimics agent/consul.Server.listen to handle the RPCType byte.
|
||||
// In the future we should be able to refactor Server and extract this RPC
|
||||
// handling logic so that we don't need to use a fake.
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
func noopRegister(*grpc.Server) {}
|
||||
|
@ -23,7 +24,7 @@ func TestHandler_EmitsStats(t *testing.T) {
|
|||
sink, reset := patchGlobalMetrics(t)
|
||||
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
handler := NewHandler(addr, noopRegister)
|
||||
handler := NewHandler(hclog.Default(), addr, noopRegister)
|
||||
reset()
|
||||
|
||||
testservice.RegisterSimpleServer(handler.srv, &simple{})
|
||||
|
|
|
@ -320,7 +320,7 @@ var _ Backend = (*testBackend)(nil)
|
|||
func runTestServer(t *testing.T, server *Server) net.Addr {
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
var grpcServer *gogrpc.Server
|
||||
handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) {
|
||||
handler := grpc.NewHandler(hclog.New(nil), addr, func(srv *gogrpc.Server) {
|
||||
grpcServer = srv
|
||||
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server)
|
||||
})
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"time"
|
||||
|
||||
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
|
@ -18,6 +20,7 @@ import (
|
|||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
agentgrpc "github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
|
@ -219,8 +222,18 @@ func tokenFromContext(ctx context.Context) string {
|
|||
// NewGRPCServer creates a grpc.Server, registers the Server, and then returns
|
||||
// the grpc.Server.
|
||||
func NewGRPCServer(s *Server, tlsConfigurator *tlsutil.Configurator) *grpc.Server {
|
||||
recoveryOpts := agentgrpc.PanicHandlerMiddlewareOpts(s.Logger)
|
||||
|
||||
opts := []grpc.ServerOption{
|
||||
grpc.MaxConcurrentStreams(2048),
|
||||
middleware.WithUnaryServerChain(
|
||||
// Add middlware interceptors to recover in case of panics.
|
||||
recovery.UnaryServerInterceptor(recoveryOpts...),
|
||||
),
|
||||
middleware.WithStreamServerChain(
|
||||
// Add middlware interceptors to recover in case of panics.
|
||||
recovery.StreamServerInterceptor(recoveryOpts...),
|
||||
),
|
||||
}
|
||||
if tlsConfigurator != nil {
|
||||
if tlsConfigurator.Cert() != nil {
|
||||
|
|
3
go.mod
3
go.mod
|
@ -29,6 +29,7 @@ require (
|
|||
github.com/google/gofuzz v1.2.0
|
||||
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22
|
||||
github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
|
||||
github.com/hashicorp/consul/api v1.11.0
|
||||
github.com/hashicorp/consul/sdk v0.8.0
|
||||
github.com/hashicorp/go-bexpr v0.1.2
|
||||
|
@ -73,7 +74,7 @@ require (
|
|||
github.com/mitchellh/reflectwalk v1.0.1
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
|
||||
github.com/pkg/errors v0.8.1
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
|
||||
github.com/prometheus/client_golang v1.4.0
|
||||
github.com/rboyer/safeio v0.2.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -214,6 +214,7 @@ github.com/gophercloud/gophercloud v0.1.0 h1:P/nh25+rzXouhytV2pUHBb65fnds26Ghl8/
|
|||
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
|
||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
|
||||
|
@ -426,8 +427,9 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
|
|||
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
|
||||
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
|
||||
|
|
Loading…
Reference in New Issue