From b5b790d4c0ae087c1de47959dfb087a69841e779 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 8 Oct 2020 12:10:38 -0400 Subject: [PATCH 1/3] streaming: Use a shorter LastGetTTL for the cache --- agent/cache-types/streaming_health_services.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index dc8a589ac1..139fc614fe 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -28,6 +28,19 @@ type StreamingHealthServices struct { deps MaterializerDeps } +// RegisterOptions returns options with a much shorter LastGetTTL than the default. +// Unlike other cache-types, StreamingHealthServices runs a materialized view in +// the background which will receive streamed events from a server. If the cache +// is not being used, that stream uses memory on the server and network transfer +// between the client and the server. +// The materialize view and the stream are stopped when the cache entry expires, +// so using a shorter TTL ensures the cache entry expires sooner. +func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions { + opts := c.RegisterOptionsBlockingRefresh.RegisterOptions() + opts.LastGetTTL = 10 * time.Minute + return opts +} + // NewStreamingHealthServices creates a cache-type for watching for service // health results via streaming updates. func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices { From b93577c94f5137440f35d554737dbb28696cd095 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 5 Oct 2020 16:28:13 -0400 Subject: [PATCH 2/3] config: add field for enabling streaming RPC endpoint --- agent/agent.go | 2 ++ agent/config/builder.go | 14 ++++++++------ agent/config/config.go | 9 ++++++++- agent/config/runtime.go | 8 ++++++-- agent/config/runtime_test.go | 12 +++++++++++- agent/consul/config.go | 19 +++++++++++++------ agent/consul/server.go | 2 +- .../checklist-adding-config-fields.md | 2 +- website/pages/docs/agent/options.mdx | 5 +++++ 9 files changed, 55 insertions(+), 18 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 9edeb53443..564d76711f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1132,6 +1132,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co // copy it whatever the value. cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout + cfg.RPCConfig = runtimeCfg.RPCConfig + if runtimeCfg.LeaveDrainTime > 0 { cfg.LeaveDrainTime = runtimeCfg.LeaveDrainTime } diff --git a/agent/config/builder.go b/agent/config/builder.go index 666ff0b655..1d2cdddfe1 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -16,6 +16,13 @@ import ( "strings" "time" + "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-sockaddr/template" + "github.com/hashicorp/memberlist" + "golang.org/x/time/rate" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/connect/ca" @@ -30,12 +37,6 @@ import ( "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-bexpr" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-multierror" - "github.com/hashicorp/go-sockaddr/template" - "github.com/hashicorp/memberlist" - "golang.org/x/time/rate" ) // Load will build the configuration including the extraHead source injected @@ -1040,6 +1041,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { RPCMaxConnsPerClient: b.intVal(c.Limits.RPCMaxConnsPerClient), RPCProtocol: b.intVal(c.RPCProtocol), RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)), + RPCConfig: consul.RPCConfig{EnableStreaming: b.boolVal(c.RPC.EnableStreaming)}, RaftProtocol: b.intVal(c.RaftProtocol), RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold), RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval), diff --git a/agent/config/config.go b/agent/config/config.go index b66bcd0d81..97f3cec2d3 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -4,9 +4,10 @@ import ( "encoding/json" "fmt" - "github.com/hashicorp/consul/lib/decode" "github.com/hashicorp/hcl" "github.com/mitchellh/mapstructure" + + "github.com/hashicorp/consul/lib/decode" ) const ( @@ -257,6 +258,8 @@ type Config struct { VerifyServerHostname *bool `json:"verify_server_hostname,omitempty" hcl:"verify_server_hostname" mapstructure:"verify_server_hostname"` Watches []map[string]interface{} `json:"watches,omitempty" hcl:"watches" mapstructure:"watches"` + RPC RPC `mapstructure:"rpc"` + // This isn't used by Consul but we've documented a feature where users // can deploy their snapshot agent configs alongside their Consul configs // so we have a placeholder here so it can be parsed but this doesn't @@ -796,3 +799,7 @@ type RawUIMetricsProxyAddHeader struct { Name *string `json:"name,omitempty" hcl:"name" mapstructure:"name"` Value *string `json:"value,omitempty" hcl:"value" mapstructure:"value"` } + +type RPC struct { + EnableStreaming *bool `json:"enable_streaming" hcl:"enable_streaming" mapstructure:"enable_streaming"` +} diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 34b870c2a0..8dc4b98664 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -7,7 +7,11 @@ import ( "strings" "time" + "github.com/hashicorp/go-uuid" + "golang.org/x/time/rate" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" @@ -15,8 +19,6 @@ import ( "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-uuid" - "golang.org/x/time/rate" ) type RuntimeSOAConfig struct { @@ -933,6 +935,8 @@ type RuntimeConfig struct { // hcl: protocol = int RPCProtocol int + RPCConfig consul.RPCConfig + // RaftProtocol sets the Raft protocol version to use on this server. // Defaults to 3. // diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 9a05364844..b4cf06b7c0 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -18,15 +18,17 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/types" - "github.com/stretchr/testify/require" ) type configTest struct { @@ -5113,6 +5115,7 @@ func TestFullConfig(t *testing.T) { "retry_join_wan": [ "PFsR02Ye", "rJdQIhER" ], "retry_max": 913, "retry_max_wan": 23160, + "rpc": {"enable_streaming": true}, "segment": "BC2NhTDi", "segments": [ { @@ -5797,6 +5800,9 @@ func TestFullConfig(t *testing.T) { retry_join_wan = [ "PFsR02Ye", "rJdQIhER" ] retry_max = 913 retry_max_wan = 23160 + rpc { + enable_streaming = true + } segment = "BC2NhTDi" segments = [ { @@ -6552,6 +6558,7 @@ func TestFullConfig(t *testing.T) { RetryJoinMaxAttemptsLAN: 913, RetryJoinMaxAttemptsWAN: 23160, RetryJoinWAN: []string{"PFsR02Ye", "rJdQIhER"}, + RPCConfig: consul.RPCConfig{EnableStreaming: true}, SegmentName: "BC2NhTDi", Segments: []structs.NetworkSegment{ { @@ -7461,6 +7468,9 @@ func TestSanitize(t *testing.T) { "RPCMaxConnsPerClient": 0, "RPCProtocol": 0, "RPCRateLimit": 0, + "RPCConfig": { + "EnableStreaming": false + }, "RaftProtocol": 0, "RaftSnapshotInterval": "0s", "RaftSnapshotThreshold": 0, diff --git a/agent/consul/config.go b/agent/consul/config.go index 72d1fac058..f63fc7fac6 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -6,6 +6,11 @@ import ( "os" "time" + "github.com/hashicorp/memberlist" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" + "golang.org/x/time/rate" + "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" @@ -13,10 +18,6 @@ import ( "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/consul/version" - "github.com/hashicorp/memberlist" - "github.com/hashicorp/raft" - "github.com/hashicorp/serf/serf" - "golang.org/x/time/rate" ) const ( @@ -475,8 +476,7 @@ type Config struct { // AutoEncrypt.Sign requests. AutoEncryptAllowTLS bool - // TODO: godoc, set this value from Agent - EnableGRPCServer bool + RPCConfig RPCConfig // Embedded Consul Enterprise specific configuration *EnterpriseConfig @@ -644,3 +644,10 @@ func DefaultConfig() *Config { return conf } + +// RPCConfig settings for the RPC server +// +// TODO: move many settings to this struct. +type RPCConfig struct { + EnableStreaming bool +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 2bcd9e3a11..04323a97e6 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -617,7 +617,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { } func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler { - if !config.EnableGRPCServer { + if !config.RPCConfig.EnableStreaming { return agentgrpc.NoOpHandler{Logger: deps.Logger} } diff --git a/contributing/checklist-adding-config-fields.md b/contributing/checklist-adding-config-fields.md index fd33b5a47e..8ff16f110b 100644 --- a/contributing/checklist-adding-config-fields.md +++ b/contributing/checklist-adding-config-fields.md @@ -55,7 +55,7 @@ There are four specific cases covered with increasing complexity: state for client agent's RPC client. - [ ] Add a test to `agent/agent_test.go` similar to others with prefix `TestAgent_reloadConfig*`. - - [ ] Add documentation to `website/source/docs/agent/options.html.md`. + - [ ] Add documentation to `website/pages/docs/agent/options.mdx`. Done! You can now use your new field in a client agent by accessing `s.agent.Config.`. diff --git a/website/pages/docs/agent/options.mdx b/website/pages/docs/agent/options.mdx index a369400a8f..64dad15f06 100644 --- a/website/pages/docs/agent/options.mdx +++ b/website/pages/docs/agent/options.mdx @@ -1633,6 +1633,11 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though. - `rpc_rate` - Configures the RPC rate limiter on Consul _clients_ by setting the maximum request rate that this agent is allowed to make for RPC requests to Consul servers, in requests per second. Defaults to infinite, which disables rate limiting. - `rpc_max_burst` - The size of the token bucket used to recharge the RPC rate limiter on Consul _clients_. Defaults to 1000 tokens, and each token is good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket for more details about how token bucket rate limiters operate. + + - `rpc.enable_streaming` - Enable the gRPC subscribe endpoint on a Consul Server. All + Servers in all connected datacenters must have this enabled before any client can use + streaming. + - `kv_max_value_size` - **(Advanced)** Configures the maximum number of bytes for a kv request body to the [`/v1/kv`](/api/kv) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration. This option affects the txn endpoint too, but Consul 1.7.2 introduced `txn_max_req_len` which is the preferred way to set the limit for the txn endpoint. If both limits are set, the higher one takes precedence. - `txn_max_req_len` - **(Advanced)** Configures the maximum number of bytes for a transaction request body to the [`/v1/txn`](/api/txn) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration. From e7d505dc336a1509b17f5694dbad3a6a008caf9c Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 5 Oct 2020 17:31:35 -0400 Subject: [PATCH 3/3] config: add field for enabling streaming in the client agent: register the new streaming cache-type --- agent/agent.go | 15 ++++++++----- agent/config/builder.go | 2 ++ agent/config/config.go | 5 ++++- agent/config/runtime.go | 2 ++ agent/config/runtime_test.go | 26 ++++++++++++---------- agent/rpcclient/health/health.go | 8 +++---- agent/setup.go | 32 +++++++++++++++++++++++++--- website/pages/docs/agent/options.mdx | 26 ++++++++++++++-------- 8 files changed, 83 insertions(+), 33 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 564d76711f..351fde8ab4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -359,7 +359,11 @@ func New(bd BaseDeps) (*Agent, error) { cache: bd.Cache, } - a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a} + cacheName := cachetype.HealthServicesName + if bd.RuntimeConfig.CacheUseStreamingBackend { + cacheName = cachetype.StreamingHealthServicesName + } + a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a, CacheName: cacheName} a.serviceManager = NewServiceManager(&a) @@ -3675,10 +3679,11 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura } } -// registerCache configures the cache and registers all the supported -// types onto the cache. This is NOT safe to call multiple times so -// care should be taken to call this exactly once after the cache -// field has been initialized. +// registerCache types on a.cache. +// This function may only be called once from New. +// +// Note: this function no longer registered all cache-types. Newer cache-types +// that do not depend on Agent are registered from registerCacheTypes. func (a *Agent) registerCache() { // Note that you should register the _agent_ as the RPC implementation and not // the a.delegate directly, otherwise tests that rely on overriding RPC diff --git a/agent/config/builder.go b/agent/config/builder.go index 1d2cdddfe1..4d5a42dd4f 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1091,6 +1091,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { Watches: c.Watches, } + rt.CacheUseStreamingBackend = b.boolVal(c.Cache.UseStreamingBackend) + if rt.Cache.EntryFetchMaxBurst <= 0 { return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst) } diff --git a/agent/config/config.go b/agent/config/config.go index 97f3cec2d3..890e5c746d 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -97,12 +97,15 @@ func (l LiteralSource) Parse() (Config, mapstructure.Metadata, error) { return l.Config, mapstructure.Metadata{}, nil } -// Cache is the tunning configuration for cache, values are optional +// Cache configuration for the agent/cache. type Cache struct { // EntryFetchMaxBurst max burst size of RateLimit for a single cache entry EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"` // EntryFetchRate represents the max calls/sec for a single cache entry EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"` + // UseStreamingBackend instead of blocking queries to populate the cache. + // Only supported by some cache types. + UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"` } // Config defines the format of a configuration file in either JSON or diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 8dc4b98664..ad3d042fdb 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -937,6 +937,8 @@ type RuntimeConfig struct { RPCConfig consul.RPCConfig + CacheUseStreamingBackend bool + // RaftProtocol sets the Raft protocol version to use on this server. // Defaults to 3. // diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index b4cf06b7c0..990dc5fd76 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -4878,7 +4878,8 @@ func TestFullConfig(t *testing.T) { "bootstrap_expect": 53, "cache": { "entry_fetch_max_burst": 42, - "entry_fetch_rate": 0.334 + "entry_fetch_rate": 0.334, + "use_streaming_backend": true }, "ca_file": "erA7T0PM", "ca_path": "mQEN1Mfp", @@ -5561,6 +5562,7 @@ func TestFullConfig(t *testing.T) { cache = { entry_fetch_max_burst = 42 entry_fetch_rate = 0.334 + use_streaming_backend = true }, ca_file = "erA7T0PM" ca_path = "mQEN1Mfp" @@ -6854,16 +6856,17 @@ func TestFullConfig(t *testing.T) { }, }, }, - SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"), - SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"), - SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"), - SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"), - SerfAllowedCIDRsLAN: []net.IPNet{}, - SerfAllowedCIDRsWAN: []net.IPNet{}, - SessionTTLMin: 26627 * time.Second, - SkipLeaveOnInt: true, - StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"}, - StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"}, + CacheUseStreamingBackend: true, + SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"), + SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"), + SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"), + SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"), + SerfAllowedCIDRsLAN: []net.IPNet{}, + SerfAllowedCIDRsWAN: []net.IPNet{}, + SessionTTLMin: 26627 * time.Second, + SkipLeaveOnInt: true, + StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"}, + StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"}, Telemetry: lib.TelemetryConfig{ CirconusAPIApp: "p4QOTe9j", CirconusAPIToken: "E3j35V23", @@ -7501,6 +7504,7 @@ func TestSanitize(t *testing.T) { "SerfBindAddrWAN": "", "SerfPortLAN": 0, "SerfPortWAN": 0, + "CacheUseStreamingBackend": false, "ServerMode": false, "ServerName": "", "ServerPort": 0, diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 4c8d5f4d8d..09fe452ab6 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -4,13 +4,13 @@ import ( "context" "github.com/hashicorp/consul/agent/cache" - cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) type Client struct { - NetRPC NetRPC - Cache CacheGetter + NetRPC NetRPC + Cache CacheGetter + CacheName string } type NetRPC interface { @@ -51,7 +51,7 @@ func (c *Client) getServiceNodes( return out, cache.ResultMeta{}, err } - raw, md, err := c.Cache.Get(ctx, cachetype.HealthServicesName, &req) + raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) if err != nil { return out, md, err } diff --git a/agent/setup.go b/agent/setup.go index 213ef304ea..7c65777c9c 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -7,8 +7,12 @@ import ( "net/http" "time" + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc/grpclog" + autoconf "github.com/hashicorp/consul/agent/auto-config" "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/grpc" @@ -19,9 +23,8 @@ import ( "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/go-hclog" - "google.golang.org/grpc/grpclog" ) // TODO: BaseDeps should be renamed in the future once more of Agent.Start @@ -84,7 +87,6 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) d.Cache = cache.New(cfg.Cache) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) - // TODO(streaming): setConfig.Scheme name for tests builder := resolver.NewServerResolverBuilder(resolver.Config{}) resolver.RegisterWithGRPC(builder) d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper())) @@ -105,9 +107,33 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) return d, err } + if err := registerCacheTypes(d); err != nil { + return d, err + } + return d, nil } +// registerCacheTypes on bd.Cache. +// +// Note: most cache types are still registered in Agent.registerCache. This +// function is for registering newer cache-types which no longer have a dependency +// on Agent. +func registerCacheTypes(bd BaseDeps) error { + if bd.RuntimeConfig.CacheUseStreamingBackend { + conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) + if err != nil { + return err + } + matDeps := cachetype.MaterializerDeps{ + Client: pbsubscribe.NewStateChangeSubscriptionClient(conn), + Logger: bd.Logger, + } + bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps)) + } + return nil +} + func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool { var rpcSrcAddr *net.TCPAddr if !ipaddr.IsAny(config.RPCBindAddr) { diff --git a/website/pages/docs/agent/options.mdx b/website/pages/docs/agent/options.mdx index 64dad15f06..d178a0ee43 100644 --- a/website/pages/docs/agent/options.mdx +++ b/website/pages/docs/agent/options.mdx @@ -1128,14 +1128,14 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." -- `cache` Cache configuration of agent. The configurable values are the following: +- `cache` configuration for client agents. The configurable values are the following: - - `entry_fetch_max_burst`: The size of the token bucket used to recharge the rate-limit per + - `entry_fetch_max_burst` The size of the token bucket used to recharge the rate-limit per cache entry. The default value is 2 and means that when cache has not been updated for a long time, 2 successive queries can be made as long as the rate-limit is not reached. - - `entry_fetch_rate`: configures the rate-limit at which the cache may refresh a single + - `entry_fetch_rate` configures the rate-limit at which the cache may refresh a single entry. On a cluster with many changes/s, watching changes in the cache might put high pressure on the servers. This ensures the number of requests for a single cache entry will never go beyond this limit, even when a given service changes every 1/100s. @@ -1146,6 +1146,13 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." The default value is "No limit" and should be tuned on large clusters to avoid performing too many RPCs on entries changing a lot. + - `use_streaming_backend` when enabled Consul client agents will use streaming rpc to + populate the cache, instead of the traditional blocking queries. All servers must + have [`rpc.enable_streaming`](#rpc_enable_streaming) enabled before any client can enable `use_streaming_backend`. + At least one of [`dns.use_cache`](#dns_use_cache) or + [`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise + this setting has no effect. + - `ca_file` This provides a file path to a PEM-encoded certificate authority. The certificate authority is used to check the authenticity of client and server connections with the appropriate [`verify_incoming`](#verify_incoming) @@ -1618,7 +1625,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `allow_write_http_from` This object is a list of networks in CIDR notation (eg "127.0.0.0/8") that are allowed to call the agent write endpoints. It defaults to an empty list, which means all networks are allowed. This is used to make the agent read-only, except for select ip ranges. - To block write calls from anywhere, use `[ "255.255.255.255/32" ]`. - To only allow write calls from localhost, use `[ "127.0.0.0/8" ]` - To only allow specific IPs, use `[ "10.0.0.1/32", "10.0.0.2/32" ]` - - `use_cache` Defaults to true. If disabled, the agent won't be using [agent caching](/api/features/caching) to answer the request. Even when the url parameter is provided. + - `use_cache` ((#http_config_use_cache)) Defaults to true. If disabled, the agent won't be using [agent caching](/api/features/caching) to answer the request. Even when the url parameter is provided. - `leave_on_terminate` If enabled, when the agent receives a TERM signal, it will send a `Leave` message to the rest of the cluster and gracefully leave. The default behavior for this feature varies based on whether or not the agent is running as a client or a server (prior to Consul 0.7 the default value was unconditionally set to `false`). On agents in client-mode, this defaults to `true` and for agents in server-mode, this defaults to `false`. @@ -1633,11 +1640,6 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though. - `rpc_rate` - Configures the RPC rate limiter on Consul _clients_ by setting the maximum request rate that this agent is allowed to make for RPC requests to Consul servers, in requests per second. Defaults to infinite, which disables rate limiting. - `rpc_max_burst` - The size of the token bucket used to recharge the RPC rate limiter on Consul _clients_. Defaults to 1000 tokens, and each token is good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket for more details about how token bucket rate limiters operate. - - - `rpc.enable_streaming` - Enable the gRPC subscribe endpoint on a Consul Server. All - Servers in all connected datacenters must have this enabled before any client can use - streaming. - - `kv_max_value_size` - **(Advanced)** Configures the maximum number of bytes for a kv request body to the [`/v1/kv`](/api/kv) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration. This option affects the txn endpoint too, but Consul 1.7.2 introduced `txn_max_req_len` which is the preferred way to set the limit for the txn endpoint. If both limits are set, the higher one takes precedence. - `txn_max_req_len` - **(Advanced)** Configures the maximum number of bytes for a transaction request body to the [`/v1/txn`](/api/txn) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration. @@ -1820,6 +1822,12 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `retry_interval_wan` Equivalent to the [`-retry-interval-wan` command-line flag](#_retry_interval_wan). +- `rpc` configuration for Consul servers. + + - `enable_streaming` ((#rpc_enable_streaming)) enables the gRPC subscribe endpoint on a Consul Server. All + servers in all federated datacenters must have this enabled before any client can use + [`cache.use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul. + - `segment` - Equivalent to the [`-segment` command-line flag](#_segment). - `segments` - This is a list of nested objects