mirror of https://github.com/status-im/consul.git
config: add field for enabling streaming RPC endpoint
This commit is contained in:
parent
b5b790d4c0
commit
b93577c94f
|
@ -1132,6 +1132,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
|
||||||
// copy it whatever the value.
|
// copy it whatever the value.
|
||||||
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
|
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
|
||||||
|
|
||||||
|
cfg.RPCConfig = runtimeCfg.RPCConfig
|
||||||
|
|
||||||
if runtimeCfg.LeaveDrainTime > 0 {
|
if runtimeCfg.LeaveDrainTime > 0 {
|
||||||
cfg.LeaveDrainTime = runtimeCfg.LeaveDrainTime
|
cfg.LeaveDrainTime = runtimeCfg.LeaveDrainTime
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,13 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-bexpr"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
|
"github.com/hashicorp/go-sockaddr/template"
|
||||||
|
"github.com/hashicorp/memberlist"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/connect/ca"
|
"github.com/hashicorp/consul/agent/connect/ca"
|
||||||
|
@ -30,12 +37,6 @@ import (
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-bexpr"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/go-multierror"
|
|
||||||
"github.com/hashicorp/go-sockaddr/template"
|
|
||||||
"github.com/hashicorp/memberlist"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Load will build the configuration including the extraHead source injected
|
// Load will build the configuration including the extraHead source injected
|
||||||
|
@ -1040,6 +1041,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||||
RPCMaxConnsPerClient: b.intVal(c.Limits.RPCMaxConnsPerClient),
|
RPCMaxConnsPerClient: b.intVal(c.Limits.RPCMaxConnsPerClient),
|
||||||
RPCProtocol: b.intVal(c.RPCProtocol),
|
RPCProtocol: b.intVal(c.RPCProtocol),
|
||||||
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
|
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
|
||||||
|
RPCConfig: consul.RPCConfig{EnableStreaming: b.boolVal(c.RPC.EnableStreaming)},
|
||||||
RaftProtocol: b.intVal(c.RaftProtocol),
|
RaftProtocol: b.intVal(c.RaftProtocol),
|
||||||
RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold),
|
RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold),
|
||||||
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
|
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
|
||||||
|
|
|
@ -4,9 +4,10 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/lib/decode"
|
|
||||||
"github.com/hashicorp/hcl"
|
"github.com/hashicorp/hcl"
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib/decode"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -257,6 +258,8 @@ type Config struct {
|
||||||
VerifyServerHostname *bool `json:"verify_server_hostname,omitempty" hcl:"verify_server_hostname" mapstructure:"verify_server_hostname"`
|
VerifyServerHostname *bool `json:"verify_server_hostname,omitempty" hcl:"verify_server_hostname" mapstructure:"verify_server_hostname"`
|
||||||
Watches []map[string]interface{} `json:"watches,omitempty" hcl:"watches" mapstructure:"watches"`
|
Watches []map[string]interface{} `json:"watches,omitempty" hcl:"watches" mapstructure:"watches"`
|
||||||
|
|
||||||
|
RPC RPC `mapstructure:"rpc"`
|
||||||
|
|
||||||
// This isn't used by Consul but we've documented a feature where users
|
// This isn't used by Consul but we've documented a feature where users
|
||||||
// can deploy their snapshot agent configs alongside their Consul configs
|
// can deploy their snapshot agent configs alongside their Consul configs
|
||||||
// so we have a placeholder here so it can be parsed but this doesn't
|
// so we have a placeholder here so it can be parsed but this doesn't
|
||||||
|
@ -796,3 +799,7 @@ type RawUIMetricsProxyAddHeader struct {
|
||||||
Name *string `json:"name,omitempty" hcl:"name" mapstructure:"name"`
|
Name *string `json:"name,omitempty" hcl:"name" mapstructure:"name"`
|
||||||
Value *string `json:"value,omitempty" hcl:"value" mapstructure:"value"`
|
Value *string `json:"value,omitempty" hcl:"value" mapstructure:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RPC struct {
|
||||||
|
EnableStreaming *bool `json:"enable_streaming" hcl:"enable_streaming" mapstructure:"enable_streaming"`
|
||||||
|
}
|
||||||
|
|
|
@ -7,7 +7,11 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-uuid"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
@ -15,8 +19,6 @@ import (
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-uuid"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type RuntimeSOAConfig struct {
|
type RuntimeSOAConfig struct {
|
||||||
|
@ -933,6 +935,8 @@ type RuntimeConfig struct {
|
||||||
// hcl: protocol = int
|
// hcl: protocol = int
|
||||||
RPCProtocol int
|
RPCProtocol int
|
||||||
|
|
||||||
|
RPCConfig consul.RPCConfig
|
||||||
|
|
||||||
// RaftProtocol sets the Raft protocol version to use on this server.
|
// RaftProtocol sets the Raft protocol version to use on this server.
|
||||||
// Defaults to 3.
|
// Defaults to 3.
|
||||||
//
|
//
|
||||||
|
|
|
@ -18,15 +18,17 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type configTest struct {
|
type configTest struct {
|
||||||
|
@ -5113,6 +5115,7 @@ func TestFullConfig(t *testing.T) {
|
||||||
"retry_join_wan": [ "PFsR02Ye", "rJdQIhER" ],
|
"retry_join_wan": [ "PFsR02Ye", "rJdQIhER" ],
|
||||||
"retry_max": 913,
|
"retry_max": 913,
|
||||||
"retry_max_wan": 23160,
|
"retry_max_wan": 23160,
|
||||||
|
"rpc": {"enable_streaming": true},
|
||||||
"segment": "BC2NhTDi",
|
"segment": "BC2NhTDi",
|
||||||
"segments": [
|
"segments": [
|
||||||
{
|
{
|
||||||
|
@ -5797,6 +5800,9 @@ func TestFullConfig(t *testing.T) {
|
||||||
retry_join_wan = [ "PFsR02Ye", "rJdQIhER" ]
|
retry_join_wan = [ "PFsR02Ye", "rJdQIhER" ]
|
||||||
retry_max = 913
|
retry_max = 913
|
||||||
retry_max_wan = 23160
|
retry_max_wan = 23160
|
||||||
|
rpc {
|
||||||
|
enable_streaming = true
|
||||||
|
}
|
||||||
segment = "BC2NhTDi"
|
segment = "BC2NhTDi"
|
||||||
segments = [
|
segments = [
|
||||||
{
|
{
|
||||||
|
@ -6552,6 +6558,7 @@ func TestFullConfig(t *testing.T) {
|
||||||
RetryJoinMaxAttemptsLAN: 913,
|
RetryJoinMaxAttemptsLAN: 913,
|
||||||
RetryJoinMaxAttemptsWAN: 23160,
|
RetryJoinMaxAttemptsWAN: 23160,
|
||||||
RetryJoinWAN: []string{"PFsR02Ye", "rJdQIhER"},
|
RetryJoinWAN: []string{"PFsR02Ye", "rJdQIhER"},
|
||||||
|
RPCConfig: consul.RPCConfig{EnableStreaming: true},
|
||||||
SegmentName: "BC2NhTDi",
|
SegmentName: "BC2NhTDi",
|
||||||
Segments: []structs.NetworkSegment{
|
Segments: []structs.NetworkSegment{
|
||||||
{
|
{
|
||||||
|
@ -7461,6 +7468,9 @@ func TestSanitize(t *testing.T) {
|
||||||
"RPCMaxConnsPerClient": 0,
|
"RPCMaxConnsPerClient": 0,
|
||||||
"RPCProtocol": 0,
|
"RPCProtocol": 0,
|
||||||
"RPCRateLimit": 0,
|
"RPCRateLimit": 0,
|
||||||
|
"RPCConfig": {
|
||||||
|
"EnableStreaming": false
|
||||||
|
},
|
||||||
"RaftProtocol": 0,
|
"RaftProtocol": 0,
|
||||||
"RaftSnapshotInterval": "0s",
|
"RaftSnapshotInterval": "0s",
|
||||||
"RaftSnapshotThreshold": 0,
|
"RaftSnapshotThreshold": 0,
|
||||||
|
|
|
@ -6,6 +6,11 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/memberlist"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
"github.com/hashicorp/serf/serf"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -13,10 +18,6 @@ import (
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/consul/version"
|
"github.com/hashicorp/consul/version"
|
||||||
"github.com/hashicorp/memberlist"
|
|
||||||
"github.com/hashicorp/raft"
|
|
||||||
"github.com/hashicorp/serf/serf"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -475,8 +476,7 @@ type Config struct {
|
||||||
// AutoEncrypt.Sign requests.
|
// AutoEncrypt.Sign requests.
|
||||||
AutoEncryptAllowTLS bool
|
AutoEncryptAllowTLS bool
|
||||||
|
|
||||||
// TODO: godoc, set this value from Agent
|
RPCConfig RPCConfig
|
||||||
EnableGRPCServer bool
|
|
||||||
|
|
||||||
// Embedded Consul Enterprise specific configuration
|
// Embedded Consul Enterprise specific configuration
|
||||||
*EnterpriseConfig
|
*EnterpriseConfig
|
||||||
|
@ -644,3 +644,10 @@ func DefaultConfig() *Config {
|
||||||
|
|
||||||
return conf
|
return conf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RPCConfig settings for the RPC server
|
||||||
|
//
|
||||||
|
// TODO: move many settings to this struct.
|
||||||
|
type RPCConfig struct {
|
||||||
|
EnableStreaming bool
|
||||||
|
}
|
||||||
|
|
|
@ -617,7 +617,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
|
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
|
||||||
if !config.EnableGRPCServer {
|
if !config.RPCConfig.EnableStreaming {
|
||||||
return agentgrpc.NoOpHandler{Logger: deps.Logger}
|
return agentgrpc.NoOpHandler{Logger: deps.Logger}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ There are four specific cases covered with increasing complexity:
|
||||||
state for client agent's RPC client.
|
state for client agent's RPC client.
|
||||||
- [ ] Add a test to `agent/agent_test.go` similar to others with prefix
|
- [ ] Add a test to `agent/agent_test.go` similar to others with prefix
|
||||||
`TestAgent_reloadConfig*`.
|
`TestAgent_reloadConfig*`.
|
||||||
- [ ] Add documentation to `website/source/docs/agent/options.html.md`.
|
- [ ] Add documentation to `website/pages/docs/agent/options.mdx`.
|
||||||
|
|
||||||
Done! You can now use your new field in a client agent by accessing
|
Done! You can now use your new field in a client agent by accessing
|
||||||
`s.agent.Config.<FieldName>`.
|
`s.agent.Config.<FieldName>`.
|
||||||
|
|
|
@ -1633,6 +1633,11 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
||||||
- `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though.
|
- `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though.
|
||||||
- `rpc_rate` - Configures the RPC rate limiter on Consul _clients_ by setting the maximum request rate that this agent is allowed to make for RPC requests to Consul servers, in requests per second. Defaults to infinite, which disables rate limiting.
|
- `rpc_rate` - Configures the RPC rate limiter on Consul _clients_ by setting the maximum request rate that this agent is allowed to make for RPC requests to Consul servers, in requests per second. Defaults to infinite, which disables rate limiting.
|
||||||
- `rpc_max_burst` - The size of the token bucket used to recharge the RPC rate limiter on Consul _clients_. Defaults to 1000 tokens, and each token is good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket for more details about how token bucket rate limiters operate.
|
- `rpc_max_burst` - The size of the token bucket used to recharge the RPC rate limiter on Consul _clients_. Defaults to 1000 tokens, and each token is good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket for more details about how token bucket rate limiters operate.
|
||||||
|
|
||||||
|
- `rpc.enable_streaming` - Enable the gRPC subscribe endpoint on a Consul Server. All
|
||||||
|
Servers in all connected datacenters must have this enabled before any client can use
|
||||||
|
streaming.
|
||||||
|
|
||||||
- `kv_max_value_size` - **(Advanced)** Configures the maximum number of bytes for a kv request body to the [`/v1/kv`](/api/kv) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration. This option affects the txn endpoint too, but Consul 1.7.2 introduced `txn_max_req_len` which is the preferred way to set the limit for the txn endpoint. If both limits are set, the higher one takes precedence.
|
- `kv_max_value_size` - **(Advanced)** Configures the maximum number of bytes for a kv request body to the [`/v1/kv`](/api/kv) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration. This option affects the txn endpoint too, but Consul 1.7.2 introduced `txn_max_req_len` which is the preferred way to set the limit for the txn endpoint. If both limits are set, the higher one takes precedence.
|
||||||
- `txn_max_req_len` - **(Advanced)** Configures the maximum number of bytes for a transaction request body to the [`/v1/txn`](/api/txn) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration.
|
- `txn_max_req_len` - **(Advanced)** Configures the maximum number of bytes for a transaction request body to the [`/v1/txn`](/api/txn) endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft) suggested max size (512KB). **Note that tuning these improperly can cause Consul to fail in unexpected ways**, it may potentially affect leadership stability and prevent timely heartbeat signals by increasing RPC IO duration.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue