From 233eacf0a4ba85ac360f588d139f9d9799bf200a Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Fri, 6 Jan 2023 13:33:53 -0500 Subject: [PATCH] inject logger and create logdrop sink (#15822) * inject logger and create logdrop sink * init sink with an empty struct instead of nil * wrap a logger instead of a sink and add a discard logger to avoid double logging * fix compile errors * fix linter errors * Fix bug where log arguments aren't properly formatted * Move log sink construction outside of handler * Add prometheus definition and docs for log drop counter Co-authored-by: Daniel Upton --- agent/agent.go | 7 ++++- agent/consul/rate/handler.go | 9 +++---- agent/consul/rate/metrics.go | 10 +++++++ agent/consul/server.go | 16 +++++++----- agent/log-drop/log-drop.go | 28 ++++++++++---------- agent/log-drop/log-drop_test.go | 26 ++++++++++--------- agent/log-drop/mock_Logger.go | 33 ++++++++++++++++++++++++ agent/log-drop/mock_SinkAdapter.go | 33 ------------------------ agent/setup.go | 2 ++ website/content/docs/agent/telemetry.mdx | 1 + 10 files changed, 92 insertions(+), 73 deletions(-) create mode 100644 agent/consul/rate/metrics.go create mode 100644 agent/log-drop/mock_Logger.go delete mode 100644 agent/log-drop/mock_SinkAdapter.go diff --git a/agent/agent.go b/agent/agent.go index 0d581355d9..a19da5c70a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -603,7 +603,12 @@ func (a *Agent) Start(ctx context.Context) error { // Setup either the client or the server. if c.ServerMode { serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer) - incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter(serverLogger, consulCfg) + + incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter( + &lib.StopChannelContext{StopCh: a.shutdownCh}, + serverLogger, + consulCfg, + ) a.externalGRPCServer = external.NewServer( a.logger.Named("grpc.external"), diff --git a/agent/consul/rate/handler.go b/agent/consul/rate/handler.go index bfdb587f4f..15b49e6bf4 100644 --- a/agent/consul/rate/handler.go +++ b/agent/consul/rate/handler.go @@ -9,8 +9,9 @@ import ( "reflect" "sync/atomic" - "github.com/hashicorp/consul/agent/consul/multilimiter" "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/agent/consul/multilimiter" ) var ( @@ -119,8 +120,6 @@ type Handler struct { limiter multilimiter.RateLimiter - // TODO: replace this with the real logger. - // https://github.com/hashicorp/consul/pull/15822 logger hclog.Logger } @@ -205,8 +204,7 @@ func (h *Handler) Allow(op Operation) error { continue } - // TODO: metrics. - // TODO: is this the correct log-level? + // TODO(NET-1382): is this the correct log-level? enforced := l.mode == ModeEnforcing h.logger.Trace("RPC exceeded allowed rate limit", @@ -217,6 +215,7 @@ func (h *Handler) Allow(op Operation) error { ) if enforced { + // TODO(NET-1382) - use the logger to print rate limiter logs. if h.leaderStatusProvider.IsLeader() && op.Type == OperationTypeWrite { return ErrRetryLater } diff --git a/agent/consul/rate/metrics.go b/agent/consul/rate/metrics.go new file mode 100644 index 0000000000..c49f0e91e4 --- /dev/null +++ b/agent/consul/rate/metrics.go @@ -0,0 +1,10 @@ +package rate + +import "github.com/armon/go-metrics/prometheus" + +var Counters = []prometheus.CounterDefinition{ + { + Name: []string{"rpc", "rate_limit", "log_dropped"}, + Help: "Increments whenever a log that is emitted because an RPC exceeded a rate limit gets dropped because the output buffer is full.", + }, +} diff --git a/agent/consul/server.go b/agent/consul/server.go index b67e59c108..89210f5f0c 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -49,6 +49,7 @@ import ( agentgrpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/services/subscribe" "github.com/hashicorp/consul/agent/hcp" + logdrop "github.com/hashicorp/consul/agent/log-drop" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -1846,7 +1847,7 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback { } } -func ConfiguredIncomingRPCLimiter(serverLogger hclog.InterceptLogger, consulCfg *Config) *rpcRate.Handler { +func ConfiguredIncomingRPCLimiter(ctx context.Context, serverLogger hclog.InterceptLogger, consulCfg *Config) *rpcRate.Handler { mlCfg := &multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second} limitsConfig := &RequestLimits{ Mode: rpcRate.RequestLimitsModeFromNameWithDefault(consulCfg.RequestLimitsMode), @@ -1854,14 +1855,15 @@ func ConfiguredIncomingRPCLimiter(serverLogger hclog.InterceptLogger, consulCfg WriteRate: consulCfg.RequestLimitsWriteRate, } + sink := logdrop.NewLogDropSink(ctx, 100, serverLogger.Named("rpc-rate-limit"), func(l logdrop.Log) { + metrics.IncrCounter([]string{"rpc", "rate_limit", "log_dropped"}, 1) + }) + logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{Output: io.Discard}) + logger.RegisterSink(sink) + rateLimiterConfig := convertConsulConfigToRateLimitHandlerConfig(*limitsConfig, mlCfg) - incomingRPCLimiter := rpcRate.NewHandler( - *rateLimiterConfig, - serverLogger.Named("rpc-rate-limit"), - ) - - return incomingRPCLimiter + return rpcRate.NewHandler(*rateLimiterConfig, logger) } func convertConsulConfigToRateLimitHandlerConfig(limitsConfig RequestLimits, multilimiterConfig *multilimiter.Config) *rpcRate.HandlerConfig { diff --git a/agent/log-drop/log-drop.go b/agent/log-drop/log-drop.go index f10f0f6717..49991c4e0f 100644 --- a/agent/log-drop/log-drop.go +++ b/agent/log-drop/log-drop.go @@ -2,34 +2,33 @@ package logdrop import ( "context" + "github.com/hashicorp/go-hclog" ) -// SinkAdapter mimic the interface from hclog.SinkAdapter +// Logger mimic the interface from hclog.Logger // -//go:generate mockery --name SinkAdapter --inpackage -type SinkAdapter interface { - Accept(name string, level hclog.Level, msg string, args ...interface{}) +//go:generate mockery --name Logger --inpackage +type Logger interface { + Log(level hclog.Level, msg string, args ...interface{}) } type Log struct { - n string s string i []interface{} l hclog.Level } type logDropSink struct { - sink SinkAdapter + logger Logger logCh chan Log - name string dropFn func(l Log) } // Accept consume a log and push it into a channel, // if the channel is filled it will call dropFn -func (r *logDropSink) Accept(name string, level hclog.Level, msg string, args ...interface{}) { - r.pushLog(Log{n: name, l: level, s: msg, i: args}) +func (r *logDropSink) Accept(_ string, level hclog.Level, msg string, args ...interface{}) { + r.pushLog(Log{l: level, s: msg, i: args}) } func (r *logDropSink) pushLog(l Log) { @@ -44,21 +43,20 @@ func (r *logDropSink) logConsumer(ctx context.Context) { for { select { case l := <-r.logCh: - r.sink.Accept(l.n, l.l, l.s, l.i) + r.logger.Log(l.l, l.s, l.i...) case <-ctx.Done(): return } } } -// NewLogDropSink create a log SinkAdapter that wrap another SinkAdapter +// NewLogDropSink create a log Logger that wrap another Logger // It also create a go routine for consuming logs, the given context need to be canceled -// to properly deallocate the SinkAdapter. -func NewLogDropSink(ctx context.Context, name string, depth int, sink SinkAdapter, dropFn func(l Log)) hclog.SinkAdapter { +// to properly deallocate the Logger. +func NewLogDropSink(ctx context.Context, depth int, logger Logger, dropFn func(l Log)) hclog.SinkAdapter { r := &logDropSink{ - sink: sink, + logger: logger, logCh: make(chan Log, depth), - name: name, dropFn: dropFn, } go r.logConsumer(ctx) diff --git a/agent/log-drop/log-drop_test.go b/agent/log-drop/log-drop_test.go index cb85f86ee7..aadc8f9a8e 100644 --- a/agent/log-drop/log-drop_test.go +++ b/agent/log-drop/log-drop_test.go @@ -2,37 +2,39 @@ package logdrop import ( "context" - "github.com/hashicorp/consul/sdk/testutil/retry" + "testing" + "time" + "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "testing" - "time" + + "github.com/hashicorp/consul/sdk/testutil/retry" ) func TestNewLogDrop(t *testing.T) { - mockLogger := NewMockSinkAdapter(t) - mockLogger.On("Accept", "test Accept", hclog.Info, "hello", []interface{}{"test", 0}).Return() - ld := NewLogDropSink(context.Background(), "test", 10, mockLogger, func(_ Log) {}) + mockLogger := NewMockLogger(t) + mockLogger.On("Log", hclog.Info, "hello", "test", 0).Return() + ld := NewLogDropSink(context.Background(), 10, mockLogger, func(_ Log) {}) require.NotNil(t, ld) - ld.Accept("test Accept", hclog.Info, "hello", "test", 0) + ld.Accept("test Log", hclog.Info, "hello", "test", 0) retry.Run(t, func(r *retry.R) { - mockLogger.AssertNumberOfCalls(r, "Accept", 1) + mockLogger.AssertNumberOfCalls(r, "Log", 1) }) } func TestLogDroppedWhenChannelFilled(t *testing.T) { - mockLogger := NewMockSinkAdapter(t) + mockLogger := NewMockLogger(t) ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() block := make(chan interface{}) - mockLogger.On("Accept", "test", hclog.Debug, "hello", []interface{}(nil)).Run(func(args mock.Arguments) { + mockLogger.On("Log", hclog.Debug, "hello").Run(func(args mock.Arguments) { <-block }) var called = make(chan interface{}) - ld := NewLogDropSink(ctx, "test", 1, mockLogger, func(l Log) { + ld := NewLogDropSink(ctx, 1, mockLogger, func(l Log) { close(called) }) for i := 0; i < 2; i++ { @@ -46,6 +48,6 @@ func TestLogDroppedWhenChannelFilled(t *testing.T) { t.Fatal("timeout waiting for drop func to be called") } retry.Run(t, func(r *retry.R) { - mockLogger.AssertNumberOfCalls(r, "Accept", 1) + mockLogger.AssertNumberOfCalls(r, "Log", 1) }) } diff --git a/agent/log-drop/mock_Logger.go b/agent/log-drop/mock_Logger.go new file mode 100644 index 0000000000..bab4207d37 --- /dev/null +++ b/agent/log-drop/mock_Logger.go @@ -0,0 +1,33 @@ +// Code generated by mockery v2.12.2. DO NOT EDIT. + +package logdrop + +import ( + hclog "github.com/hashicorp/go-hclog" + mock "github.com/stretchr/testify/mock" + + testing "testing" +) + +// MockLogger is an autogenerated mock type for the Logger type +type MockLogger struct { + mock.Mock +} + +// Log provides a mock function with given fields: level, msg, args +func (_m *MockLogger) Log(level hclog.Level, msg string, args ...interface{}) { + var _ca []interface{} + _ca = append(_ca, level, msg) + _ca = append(_ca, args...) + _m.Called(_ca...) +} + +// NewMockLogger creates a new instance of MockLogger. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockLogger(t testing.TB) *MockLogger { + mock := &MockLogger{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/log-drop/mock_SinkAdapter.go b/agent/log-drop/mock_SinkAdapter.go deleted file mode 100644 index ea7a75901b..0000000000 --- a/agent/log-drop/mock_SinkAdapter.go +++ /dev/null @@ -1,33 +0,0 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. - -package logdrop - -import ( - hclog "github.com/hashicorp/go-hclog" - mock "github.com/stretchr/testify/mock" - - testing "testing" -) - -// MockSinkAdapter is an autogenerated mock type for the SinkAdapter type -type MockSinkAdapter struct { - mock.Mock -} - -// Accept provides a mock function with given fields: name, level, msg, args -func (_m *MockSinkAdapter) Accept(name string, level hclog.Level, msg string, args ...interface{}) { - var _ca []interface{} - _ca = append(_ca, name, level, msg) - _ca = append(_ca, args...) - _m.Called(_ca...) -} - -// NewMockSinkAdapter creates a new instance of MockSinkAdapter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockSinkAdapter(t testing.TB) *MockSinkAdapter { - mock := &MockSinkAdapter{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/agent/setup.go b/agent/setup.go index f5e0e8981d..01d7b7593f 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/consul/xdscapacity" @@ -313,6 +314,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau local.StateCounters, xds.StatsCounters, raftCounters, + rate.Counters, } // Flatten definitions // NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique? diff --git a/website/content/docs/agent/telemetry.mdx b/website/content/docs/agent/telemetry.mdx index 4e56641e68..cf12ed8295 100644 --- a/website/content/docs/agent/telemetry.mdx +++ b/website/content/docs/agent/telemetry.mdx @@ -477,6 +477,7 @@ These metrics are used to monitor the health of the Consul servers. | `consul.raft.transition.heartbeat_timeout` | The number of times an agent has transitioned to the Candidate state, after receive no heartbeat messages from the last known leader. | timeouts / interval | counter | | `consul.raft.verify_leader` | This metric doesn't have a direct correlation to the leader change. It just counts the number of times an agent checks if it is still the leader or not. For example, during every consistent read, the check is done. Depending on the load in the system, this metric count can be high as it is incremented each time a consistent read is completed. | checks / interval | Counter | | `consul.rpc.accept_conn` | Increments when a server accepts an RPC connection. | connections | counter | +| `consul.rpc.rate_limit.log_dropped` | Increments whenever a log that is emitted because an RPC exceeded a rate limit gets dropped because the output buffer is full. | log messages dropped | counter | | `consul.catalog.register` | Measures the time it takes to complete a catalog register operation. | ms | timer | | `consul.catalog.deregister` | Measures the time it takes to complete a catalog deregister operation. | ms | timer | | `consul.server.isLeader` | Track if a server is a leader(1) or not(0) | 1 or 0 | gauge |