mirror of https://github.com/status-im/consul.git
agent/txn_endpoint: configure max txn request length (#7388)
configure max transaction size separately from kv limit
This commit is contained in:
parent
0041102e29
commit
a8f4123d37
|
@ -968,6 +968,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||||
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
|
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
|
||||||
TaggedAddresses: c.TaggedAddresses,
|
TaggedAddresses: c.TaggedAddresses,
|
||||||
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
|
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
|
||||||
|
TxnMaxReqLen: b.uint64Val(c.Limits.TxnMaxReqLen),
|
||||||
UIDir: b.stringVal(c.UIDir),
|
UIDir: b.stringVal(c.UIDir),
|
||||||
UIContentPath: UIPathBuilder(b.stringVal(c.UIContentPath)),
|
UIContentPath: UIPathBuilder(b.stringVal(c.UIContentPath)),
|
||||||
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
|
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
|
||||||
|
|
|
@ -684,6 +684,7 @@ type Limits struct {
|
||||||
RPCMaxConnsPerClient *int `json:"rpc_max_conns_per_client,omitempty" hcl:"rpc_max_conns_per_client" mapstructure:"rpc_max_conns_per_client"`
|
RPCMaxConnsPerClient *int `json:"rpc_max_conns_per_client,omitempty" hcl:"rpc_max_conns_per_client" mapstructure:"rpc_max_conns_per_client"`
|
||||||
RPCRate *float64 `json:"rpc_rate,omitempty" hcl:"rpc_rate" mapstructure:"rpc_rate"`
|
RPCRate *float64 `json:"rpc_rate,omitempty" hcl:"rpc_rate" mapstructure:"rpc_rate"`
|
||||||
KVMaxValueSize *uint64 `json:"kv_max_value_size,omitempty" hcl:"kv_max_value_size" mapstructure:"kv_max_value_size"`
|
KVMaxValueSize *uint64 `json:"kv_max_value_size,omitempty" hcl:"kv_max_value_size" mapstructure:"kv_max_value_size"`
|
||||||
|
TxnMaxReqLen *uint64 `json:"txn_max_req_len,omitempty" hcl:"txn_max_req_len" mapstructure:"txn_max_req_len"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Segment struct {
|
type Segment struct {
|
||||||
|
|
|
@ -110,6 +110,7 @@ func DefaultSource() Source {
|
||||||
rpc_max_burst = 1000
|
rpc_max_burst = 1000
|
||||||
rpc_max_conns_per_client = 100
|
rpc_max_conns_per_client = 100
|
||||||
kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
|
kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
|
||||||
|
txn_max_req_len = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
|
||||||
}
|
}
|
||||||
performance = {
|
performance = {
|
||||||
leave_drain_time = "5s"
|
leave_drain_time = "5s"
|
||||||
|
|
|
@ -1452,6 +1452,12 @@ type RuntimeConfig struct {
|
||||||
// hcl: translate_wan_addrs = (true|false)
|
// hcl: translate_wan_addrs = (true|false)
|
||||||
TranslateWANAddrs bool
|
TranslateWANAddrs bool
|
||||||
|
|
||||||
|
// TxnMaxReqLen configures the upper limit for the size (in bytes) of the
|
||||||
|
// incoming request bodies for transactions to the /txn endpoint.
|
||||||
|
//
|
||||||
|
// hcl: limits { txn_max_req_len = uint64 }
|
||||||
|
TxnMaxReqLen uint64
|
||||||
|
|
||||||
// UIDir is the directory containing the Web UI resources.
|
// UIDir is the directory containing the Web UI resources.
|
||||||
// If provided, the UI endpoints will be enabled.
|
// If provided, the UI endpoints will be enabled.
|
||||||
//
|
//
|
||||||
|
|
|
@ -3870,7 +3870,8 @@ func TestFullConfig(t *testing.T) {
|
||||||
"rpc_rate": 12029.43,
|
"rpc_rate": 12029.43,
|
||||||
"rpc_max_burst": 44848,
|
"rpc_max_burst": 44848,
|
||||||
"rpc_max_conns_per_client": 2954,
|
"rpc_max_conns_per_client": 2954,
|
||||||
"kv_max_value_size": 1234567800000000
|
"kv_max_value_size": 1234567800000000,
|
||||||
|
"txn_max_req_len": 5678000000000000
|
||||||
},
|
},
|
||||||
"log_level": "k1zo9Spt",
|
"log_level": "k1zo9Spt",
|
||||||
"log_json": true,
|
"log_json": true,
|
||||||
|
@ -4500,6 +4501,7 @@ func TestFullConfig(t *testing.T) {
|
||||||
rpc_max_burst = 44848
|
rpc_max_burst = 44848
|
||||||
rpc_max_conns_per_client = 2954
|
rpc_max_conns_per_client = 2954
|
||||||
kv_max_value_size = 1234567800000000
|
kv_max_value_size = 1234567800000000
|
||||||
|
txn_max_req_len = 5678000000000000
|
||||||
}
|
}
|
||||||
log_level = "k1zo9Spt"
|
log_level = "k1zo9Spt"
|
||||||
log_json = true
|
log_json = true
|
||||||
|
@ -5573,6 +5575,7 @@ func TestFullConfig(t *testing.T) {
|
||||||
"wan_ipv4": "78.63.37.19",
|
"wan_ipv4": "78.63.37.19",
|
||||||
},
|
},
|
||||||
TranslateWANAddrs: true,
|
TranslateWANAddrs: true,
|
||||||
|
TxnMaxReqLen: 5678000000000000,
|
||||||
UIContentPath: "/consul/",
|
UIContentPath: "/consul/",
|
||||||
UIDir: "11IFzAUn",
|
UIDir: "11IFzAUn",
|
||||||
UnixSocketUser: "E0nB1DwA",
|
UnixSocketUser: "E0nB1DwA",
|
||||||
|
@ -5908,6 +5911,7 @@ func TestSanitize(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
KVMaxValueSize: 1234567800000000,
|
KVMaxValueSize: 1234567800000000,
|
||||||
|
TxnMaxReqLen: 5678000000000000,
|
||||||
}
|
}
|
||||||
|
|
||||||
rtJSON := `{
|
rtJSON := `{
|
||||||
|
@ -6217,6 +6221,7 @@ func TestSanitize(t *testing.T) {
|
||||||
"StatsiteAddr": ""
|
"StatsiteAddr": ""
|
||||||
},
|
},
|
||||||
"TranslateWANAddrs": false,
|
"TranslateWANAddrs": false,
|
||||||
|
"TxnMaxReqLen": 5678000000000000,
|
||||||
"UIDir": "",
|
"UIDir": "",
|
||||||
"UIContentPath": "",
|
"UIContentPath": "",
|
||||||
"UnixSocketGroup": "",
|
"UnixSocketGroup": "",
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -65,27 +64,49 @@ func isWrite(op api.KVOp) bool {
|
||||||
// a boolean, that if false means an error response has been generated and
|
// a boolean, that if false means an error response has been generated and
|
||||||
// processing should stop.
|
// processing should stop.
|
||||||
func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) {
|
func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (structs.TxnOps, int, bool) {
|
||||||
|
// The TxnMaxReqLen limit and KVMaxValueSize limit both default to the
|
||||||
|
// suggested raft data size and can be configured independently. The
|
||||||
|
// TxnMaxReqLen is enforced on the cumulative size of the transaction,
|
||||||
|
// whereas the KVMaxValueSize limit is imposed on the values of individual KV
|
||||||
|
// operations -- this is to keep consistent with the behavior for KV values
|
||||||
|
// in the kvs endpoint.
|
||||||
|
//
|
||||||
|
// The defaults are set to the suggested raft size to keep the total
|
||||||
|
// transaction size reasonable to account for timely heartbeat signals. If
|
||||||
|
// the TxnMaxReqLen limit is above the raft's suggested threshold, large
|
||||||
|
// transactions are automatically set to attempt a chunking apply.
|
||||||
|
// Performance may degrade and warning messages may appear.
|
||||||
|
maxTxnLen := int64(s.agent.config.TxnMaxReqLen)
|
||||||
|
kvMaxValueSize := int64(s.agent.config.KVMaxValueSize)
|
||||||
|
|
||||||
sizeStr := req.Header.Get("Content-Length")
|
// For backward compatibility, KVMaxValueSize is used as the max txn request
|
||||||
if sizeStr != "" {
|
// length if it is configured greater than TxnMaxReqLen or its default
|
||||||
if size, err := strconv.Atoi(sizeStr); err != nil {
|
if maxTxnLen < kvMaxValueSize {
|
||||||
resp.WriteHeader(http.StatusBadRequest)
|
maxTxnLen = kvMaxValueSize
|
||||||
fmt.Fprintf(resp, "Failed to parse Content-Length: %v", err)
|
}
|
||||||
return nil, 0, false
|
|
||||||
} else if size > int(s.agent.config.KVMaxValueSize) {
|
// Check Content-Length first before decoding to return early
|
||||||
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
if req.ContentLength > maxTxnLen {
|
||||||
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", s.agent.config.KVMaxValueSize)
|
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||||
return nil, 0, false
|
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", maxTxnLen)
|
||||||
}
|
return nil, 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note the body is in API format, and not the RPC format. If we can't
|
|
||||||
// decode it, we will return a 400 since we don't have enough context to
|
|
||||||
// associate the error with a given operation.
|
|
||||||
var ops api.TxnOps
|
var ops api.TxnOps
|
||||||
|
req.Body = http.MaxBytesReader(resp, req.Body, maxTxnLen)
|
||||||
if err := decodeBody(req.Body, &ops); err != nil {
|
if err := decodeBody(req.Body, &ops); err != nil {
|
||||||
resp.WriteHeader(http.StatusBadRequest)
|
if err.Error() == "http: request body too large" {
|
||||||
fmt.Fprintf(resp, "Failed to parse body: %v", err)
|
// The request size is also verified during decoding to double check
|
||||||
|
// if the Content-Length header was not set by the client.
|
||||||
|
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||||
|
fmt.Fprintf(resp, "Request body too large, max size: %v bytes", maxTxnLen)
|
||||||
|
} else {
|
||||||
|
// Note the body is in API format, and not the RPC format. If we can't
|
||||||
|
// decode it, we will return a 400 since we don't have enough context to
|
||||||
|
// associate the error with a given operation.
|
||||||
|
resp.WriteHeader(http.StatusBadRequest)
|
||||||
|
fmt.Fprintf(resp, "Failed to parse body: %v", err)
|
||||||
|
}
|
||||||
return nil, 0, false
|
return nil, 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,17 +125,15 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
|
||||||
// byte arrays so we can assign right over.
|
// byte arrays so we can assign right over.
|
||||||
var opsRPC structs.TxnOps
|
var opsRPC structs.TxnOps
|
||||||
var writes int
|
var writes int
|
||||||
var netKVSize uint64
|
|
||||||
for _, in := range ops {
|
for _, in := range ops {
|
||||||
switch {
|
switch {
|
||||||
case in.KV != nil:
|
case in.KV != nil:
|
||||||
size := len(in.KV.Value)
|
size := len(in.KV.Value)
|
||||||
if uint64(size) > s.agent.config.KVMaxValueSize {
|
if int64(size) > kvMaxValueSize {
|
||||||
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||||
fmt.Fprintf(resp, "Value for key %q is too large (%d > %d bytes)", in.KV.Key, size, s.agent.config.KVMaxValueSize)
|
fmt.Fprintf(resp, "Value for key %q is too large (%d > %d bytes)", in.KV.Key, size, s.agent.config.KVMaxValueSize)
|
||||||
return nil, 0, false
|
return nil, 0, false
|
||||||
}
|
}
|
||||||
netKVSize += uint64(size)
|
|
||||||
|
|
||||||
verb := in.KV.Verb
|
verb := in.KV.Verb
|
||||||
if isWrite(verb) {
|
if isWrite(verb) {
|
||||||
|
@ -257,15 +276,6 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enforce an overall size limit to help prevent abuse.
|
|
||||||
if netKVSize > s.agent.config.KVMaxValueSize {
|
|
||||||
resp.WriteHeader(http.StatusRequestEntityTooLarge)
|
|
||||||
fmt.Fprintf(resp, "Cumulative size of key data is too large (%d > %d bytes)",
|
|
||||||
netKVSize, s.agent.config.KVMaxValueSize)
|
|
||||||
|
|
||||||
return nil, 0, false
|
|
||||||
}
|
|
||||||
|
|
||||||
return opsRPC, writes, true
|
return opsRPC, writes, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -54,7 +53,6 @@ func TestTxnEndpoint_Bad_Size_Item(t *testing.T) {
|
||||||
]
|
]
|
||||||
`, value)))
|
`, value)))
|
||||||
req, _ := http.NewRequest("PUT", "/v1/txn", buf)
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf)
|
||||||
req.Header.Add("Content-Length", fmt.Sprintf("%d", buf.Len()))
|
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
if _, err := agent.srv.Txn(resp, req); err != nil {
|
if _, err := agent.srv.Txn(resp, req); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -67,14 +65,30 @@ func TestTxnEndpoint_Bad_Size_Item(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("toobig", func(t *testing.T) {
|
t.Run("exceeds default limits", func(t *testing.T) {
|
||||||
a := NewTestAgent(t, t.Name(), "")
|
a := NewTestAgent(t, t.Name(), "")
|
||||||
testIt(t, a, false)
|
testIt(t, a, false)
|
||||||
a.Shutdown()
|
a.Shutdown()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("exceeds configured max txn len", func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 700000 }")
|
||||||
|
testIt(t, a, false)
|
||||||
|
a.Shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("exceeds default max kv value size", func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 123456789 }")
|
||||||
|
testIt(t, a, false)
|
||||||
|
a.Shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("allowed", func(t *testing.T) {
|
t.Run("allowed", func(t *testing.T) {
|
||||||
a := NewTestAgent(t, t.Name(), "limits = { kv_max_value_size = 123456789 }")
|
a := NewTestAgent(t, t.Name(), `
|
||||||
|
limits = {
|
||||||
|
txn_max_req_len = 123456789
|
||||||
|
kv_max_value_size = 123456789
|
||||||
|
}`)
|
||||||
testIt(t, a, true)
|
testIt(t, a, true)
|
||||||
a.Shutdown()
|
a.Shutdown()
|
||||||
})
|
})
|
||||||
|
@ -124,13 +138,35 @@ func TestTxnEndpoint_Bad_Size_Net(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("toobig", func(t *testing.T) {
|
t.Run("exceeds default limits", func(t *testing.T) {
|
||||||
a := NewTestAgent(t, t.Name(), "")
|
a := NewTestAgent(t, t.Name(), "")
|
||||||
testIt(a, false)
|
testIt(a, false)
|
||||||
a.Shutdown()
|
a.Shutdown()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("exceeds configured max txn len", func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 700000 }")
|
||||||
|
testIt(a, false)
|
||||||
|
a.Shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("exceeds default max kv value size", func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, t.Name(), "limits = { txn_max_req_len = 123456789 }")
|
||||||
|
testIt(a, false)
|
||||||
|
a.Shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("allowed", func(t *testing.T) {
|
t.Run("allowed", func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, t.Name(), `
|
||||||
|
limits = {
|
||||||
|
txn_max_req_len = 123456789
|
||||||
|
kv_max_value_size = 123456789
|
||||||
|
}`)
|
||||||
|
testIt(a, true)
|
||||||
|
a.Shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("allowed kv max backward compatible", func(t *testing.T) {
|
||||||
a := NewTestAgent(t, t.Name(), "limits = { kv_max_value_size = 123456789 }")
|
a := NewTestAgent(t, t.Name(), "limits = { kv_max_value_size = 123456789 }")
|
||||||
testIt(a, true)
|
testIt(a, true)
|
||||||
a.Shutdown()
|
a.Shutdown()
|
||||||
|
@ -612,50 +648,3 @@ func TestTxnEndpoint_UpdateCheck(t *testing.T) {
|
||||||
}
|
}
|
||||||
assert.Equal(t, expected, txnResp)
|
assert.Equal(t, expected, txnResp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConvertOps_ContentLength(t *testing.T) {
|
|
||||||
a := NewTestAgent(t, t.Name(), "")
|
|
||||||
defer a.Shutdown()
|
|
||||||
|
|
||||||
jsonBody := `[
|
|
||||||
{
|
|
||||||
"KV": {
|
|
||||||
"Verb": "set",
|
|
||||||
"Key": "key1",
|
|
||||||
"Value": "aGVsbG8gd29ybGQ="
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]`
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
contentLength string
|
|
||||||
ok bool
|
|
||||||
}{
|
|
||||||
{"", true},
|
|
||||||
{strconv.Itoa(len(jsonBody)), true},
|
|
||||||
{strconv.Itoa(raft.SuggestedMaxDataSize), true},
|
|
||||||
{strconv.Itoa(raft.SuggestedMaxDataSize + 100), false},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range tests {
|
|
||||||
t.Run("contentLength: "+tc.contentLength, func(t *testing.T) {
|
|
||||||
resp := httptest.NewRecorder()
|
|
||||||
var body bytes.Buffer
|
|
||||||
|
|
||||||
// Doesn't matter what the request body size actually is, as we only
|
|
||||||
// check 'Content-Length' header in this test anyway.
|
|
||||||
body.WriteString(jsonBody)
|
|
||||||
|
|
||||||
req := httptest.NewRequest("POST", "http://foo.com", &body)
|
|
||||||
req.Header.Add("Content-Length", tc.contentLength)
|
|
||||||
|
|
||||||
_, _, ok := a.srv.convertOps(resp, req)
|
|
||||||
if ok != tc.ok {
|
|
||||||
t.Fatal("ok != tc.ok")
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -1449,6 +1449,14 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
||||||
single RPC call to a Consul server. See
|
single RPC call to a Consul server. See
|
||||||
https://en.wikipedia.org/wiki/Token_bucket for more details about how
|
https://en.wikipedia.org/wiki/Token_bucket for more details about how
|
||||||
token bucket rate limiters operate.
|
token bucket rate limiters operate.
|
||||||
|
* <a name="txn_max_req_len"></a><a href="#txn_max_req_len">
|
||||||
|
`txn_max_req_len`</a> - Configures the maximum number of
|
||||||
|
bytes for a transaction request body to the [`/v1/txn`](/api/txn.html)
|
||||||
|
endpoint. This limit defaults to [raft's](https://github.com/hashicorp/raft)
|
||||||
|
suggested max size. **Note that increasing beyond this default can
|
||||||
|
cause Consul to fail in unexpected ways**, it may potentially affect
|
||||||
|
leadership stability and prevent timely heartbeat signals by
|
||||||
|
increasing RPC IO duration.
|
||||||
|
|
||||||
* <a name="log_file"></a><a href="#log_file">`log_file`</a> Equivalent to the
|
* <a name="log_file"></a><a href="#log_file">`log_file`</a> Equivalent to the
|
||||||
[`-log-file` command-line flag](#_log_file).
|
[`-log-file` command-line flag](#_log_file).
|
||||||
|
|
Loading…
Reference in New Issue