mirror of https://github.com/status-im/consul.git
Security fixes (#7182)
* Mitigate HTTP/RPC Services Allow Unbounded Resource Usage Fixes #7159. Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com> Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
parent
d5f9268222
commit
5531678e9e
|
@ -18,6 +18,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
|
@ -305,6 +306,10 @@ type Agent struct {
|
|||
// store within the data directory. This will prevent loading while writing as
|
||||
// well as multiple concurrent writes.
|
||||
persistedTokensLock sync.RWMutex
|
||||
|
||||
// httpConnLimiter is used to limit connections to the HTTP server by client
|
||||
// IP.
|
||||
httpConnLimiter connlimit.Limiter
|
||||
}
|
||||
|
||||
// New verifies the configuration given has a Datacenter and DataDir
|
||||
|
@ -524,6 +529,11 @@ func (a *Agent) Start() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Configure the http connection limiter.
|
||||
a.httpConnLimiter.SetConfig(connlimit.Config{
|
||||
MaxConnsPerClientIP: a.config.HTTPMaxConnsPerClient,
|
||||
})
|
||||
|
||||
// Create listeners and unstarted servers; see comment on listenHTTP why
|
||||
// we are doing this.
|
||||
servers, err := a.listenHTTP()
|
||||
|
@ -866,6 +876,7 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) {
|
|||
tlscfg = a.tlsConfigurator.IncomingHTTPSConfig()
|
||||
l = tls.NewListener(l, tlscfg)
|
||||
}
|
||||
|
||||
srv := &HTTPServer{
|
||||
Server: &http.Server{
|
||||
Addr: l.Addr().String(),
|
||||
|
@ -878,13 +889,37 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) {
|
|||
}
|
||||
srv.Server.Handler = srv.handler(a.config.EnableDebug)
|
||||
|
||||
// This will enable upgrading connections to HTTP/2 as
|
||||
// part of TLS negotiation.
|
||||
// Load the connlimit helper into the server
|
||||
connLimitFn := a.httpConnLimiter.HTTPConnStateFunc()
|
||||
|
||||
if proto == "https" {
|
||||
// Enforce TLS handshake timeout
|
||||
srv.Server.ConnState = func(conn net.Conn, state http.ConnState) {
|
||||
switch state {
|
||||
case http.StateNew:
|
||||
// Set deadline to prevent slow send before TLS handshake or first
|
||||
// byte of request.
|
||||
conn.SetReadDeadline(time.Now().Add(a.config.HTTPSHandshakeTimeout))
|
||||
case http.StateActive:
|
||||
// Clear read deadline. We should maybe set read timeouts more
|
||||
// generally but that's a bigger task as some HTTP endpoints may
|
||||
// stream large requests and responses (e.g. snapshot) so we can't
|
||||
// set sensible blanket timeouts here.
|
||||
conn.SetReadDeadline(time.Time{})
|
||||
}
|
||||
// Pass through to conn limit. This is OK because we didn't change
|
||||
// state (i.e. Close conn).
|
||||
connLimitFn(conn, state)
|
||||
}
|
||||
|
||||
// This will enable upgrading connections to HTTP/2 as
|
||||
// part of TLS negotiation.
|
||||
err = http2.ConfigureServer(srv.Server, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
srv.Server.ConnState = connLimitFn
|
||||
}
|
||||
|
||||
ln = append(ln, l)
|
||||
|
@ -1252,10 +1287,18 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
|
|||
base.RPCMaxBurst = a.config.RPCMaxBurst
|
||||
}
|
||||
|
||||
// RPC-related performance configs.
|
||||
if a.config.RPCHoldTimeout > 0 {
|
||||
base.RPCHoldTimeout = a.config.RPCHoldTimeout
|
||||
// RPC timeouts/limits.
|
||||
if a.config.RPCHandshakeTimeout > 0 {
|
||||
base.RPCHandshakeTimeout = a.config.RPCHandshakeTimeout
|
||||
}
|
||||
if a.config.RPCMaxConnsPerClient > 0 {
|
||||
base.RPCMaxConnsPerClient = a.config.RPCMaxConnsPerClient
|
||||
}
|
||||
|
||||
// RPC-related performance configs. We allow explicit zero value to disable so
|
||||
// copy it whatever the value.
|
||||
base.RPCHoldTimeout = a.config.RPCHoldTimeout
|
||||
|
||||
if a.config.LeaveDrainTime > 0 {
|
||||
base.LeaveDrainTime = a.config.LeaveDrainTime
|
||||
}
|
||||
|
@ -3970,6 +4013,10 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
|||
|
||||
a.loadLimits(newCfg)
|
||||
|
||||
a.httpConnLimiter.SetConfig(connlimit.Config{
|
||||
MaxConnsPerClientIP: newCfg.HTTPMaxConnsPerClient,
|
||||
})
|
||||
|
||||
for _, s := range a.dnsServers {
|
||||
if err := s.ReloadConfig(newCfg); err != nil {
|
||||
return fmt.Errorf("Failed reloading dns config : %v", err)
|
||||
|
@ -3979,7 +4026,7 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
|||
// this only gets used by the consulConfig function and since
|
||||
// that is only ever done during init and reload here then
|
||||
// an in place modification is safe as reloads cannot be
|
||||
// concurrent due to both gaing a full lock on the stateLock
|
||||
// concurrent due to both gaining a full lock on the stateLock
|
||||
a.config.ConfigEntryBootstrap = newCfg.ConfigEntryBootstrap
|
||||
|
||||
// create the config for the rpc server/client
|
||||
|
|
|
@ -906,6 +906,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||
EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing),
|
||||
GRPCPort: grpcPort,
|
||||
GRPCAddrs: grpcAddrs,
|
||||
HTTPMaxConnsPerClient: b.intVal(c.Limits.HTTPMaxConnsPerClient),
|
||||
HTTPSHandshakeTimeout: b.durationVal("limits.https_handshake_timeout", c.Limits.HTTPSHandshakeTimeout),
|
||||
KeyFile: b.stringVal(c.KeyFile),
|
||||
KVMaxValueSize: b.uint64Val(c.Limits.KVMaxValueSize),
|
||||
LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime),
|
||||
|
@ -925,8 +927,10 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||
PrimaryDatacenter: primaryDatacenter,
|
||||
RPCAdvertiseAddr: rpcAdvertiseAddr,
|
||||
RPCBindAddr: rpcBindAddr,
|
||||
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
|
||||
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
|
||||
RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst),
|
||||
RPCMaxConnsPerClient: b.intVal(c.Limits.RPCMaxConnsPerClient),
|
||||
RPCProtocol: b.intVal(c.RPCProtocol),
|
||||
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
|
||||
RaftProtocol: b.intVal(c.RaftProtocol),
|
||||
|
|
|
@ -675,9 +675,13 @@ type UnixSocket struct {
|
|||
}
|
||||
|
||||
type Limits struct {
|
||||
RPCMaxBurst *int `json:"rpc_max_burst,omitempty" hcl:"rpc_max_burst" mapstructure:"rpc_max_burst"`
|
||||
RPCRate *float64 `json:"rpc_rate,omitempty" hcl:"rpc_rate" mapstructure:"rpc_rate"`
|
||||
KVMaxValueSize *uint64 `json:"kv_max_value_size,omitempty" hcl:"kv_max_value_size" mapstructure:"kv_max_value_size"`
|
||||
HTTPMaxConnsPerClient *int `json:"http_max_conns_per_client,omitempty" hcl:"http_max_conns_per_client" mapstructure:"http_max_conns_per_client"`
|
||||
HTTPSHandshakeTimeout *string `json:"https_handshake_timeout,omitempty" hcl:"https_handshake_timeout" mapstructure:"https_handshake_timeout"`
|
||||
RPCHandshakeTimeout *string `json:"rpc_handshake_timeout,omitempty" hcl:"rpc_handshake_timeout" mapstructure:"rpc_handshake_timeout"`
|
||||
RPCMaxBurst *int `json:"rpc_max_burst,omitempty" hcl:"rpc_max_burst" mapstructure:"rpc_max_burst"`
|
||||
RPCMaxConnsPerClient *int `json:"rpc_max_conns_per_client,omitempty" hcl:"rpc_max_conns_per_client" mapstructure:"rpc_max_conns_per_client"`
|
||||
RPCRate *float64 `json:"rpc_rate,omitempty" hcl:"rpc_rate" mapstructure:"rpc_rate"`
|
||||
KVMaxValueSize *uint64 `json:"kv_max_value_size,omitempty" hcl:"kv_max_value_size" mapstructure:"kv_max_value_size"`
|
||||
}
|
||||
|
||||
type Segment struct {
|
||||
|
|
|
@ -103,8 +103,12 @@ func DefaultSource() Source {
|
|||
recursor_timeout = "2s"
|
||||
}
|
||||
limits = {
|
||||
http_max_conns_per_client = 100
|
||||
https_handshake_timeout = "5s"
|
||||
rpc_handshake_timeout = "5s"
|
||||
rpc_rate = -1
|
||||
rpc_max_burst = 1000
|
||||
rpc_max_conns_per_client = 100
|
||||
kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
|
||||
}
|
||||
performance = {
|
||||
|
|
|
@ -803,6 +803,18 @@ type RuntimeConfig struct {
|
|||
// hcl: client_addr = string addresses { https = string } ports { https = int }
|
||||
HTTPSAddrs []net.Addr
|
||||
|
||||
// HTTPMaxConnsPerClient limits the number of concurrent TCP connections the
|
||||
// HTTP(S) server will accept from any single source IP address.
|
||||
//
|
||||
// hcl: limits{ http_max_conns_per_client = 100 }
|
||||
HTTPMaxConnsPerClient int
|
||||
|
||||
// HTTPSHandshakeTimeout is the time allowed for HTTPS client to complete the
|
||||
// TLS handshake and send first bytes of the request.
|
||||
//
|
||||
// hcl: limits{ https_handshake_timeout = "5s" }
|
||||
HTTPSHandshakeTimeout time.Duration
|
||||
|
||||
// HTTPSPort is the port the HTTP server listens on. The default is -1.
|
||||
// Setting this to a value <= 0 disables the endpoint.
|
||||
//
|
||||
|
@ -927,6 +939,15 @@ type RuntimeConfig struct {
|
|||
// hcl: bind_addr = string ports { server = int }
|
||||
RPCBindAddr *net.TCPAddr
|
||||
|
||||
// RPCHandshakeTimeout is the timeout for reading the initial magic byte on a
|
||||
// new RPC connection. If this is set high it may allow unauthenticated users
|
||||
// to hold connections open arbitrarily long, even when mutual TLS is being
|
||||
// enforced. It may be set to 0 explicitly to disable the timeout but this
|
||||
// should never be used in production. Default is 5 seconds.
|
||||
//
|
||||
// hcl: limits { rpc_handshake_timeout = "duration" }
|
||||
RPCHandshakeTimeout time.Duration
|
||||
|
||||
// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
|
||||
// This is used to paper over a loss of leadership by instead holding RPCs,
|
||||
// so that the caller experiences a slow response rather than an error.
|
||||
|
@ -949,6 +970,12 @@ type RuntimeConfig struct {
|
|||
RPCRateLimit rate.Limit
|
||||
RPCMaxBurst int
|
||||
|
||||
// RPCMaxConnsPerClient limits the number of concurrent TCP connections the
|
||||
// RPC server will accept from any single source IP address.
|
||||
//
|
||||
// hcl: limits{ rpc_max_conns_per_client = 100 }
|
||||
RPCMaxConnsPerClient int
|
||||
|
||||
// RPCProtocol is the Consul protocol version to use.
|
||||
//
|
||||
// hcl: protocol = int
|
||||
|
|
|
@ -3455,6 +3455,28 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
|
|||
}
|
||||
},
|
||||
},
|
||||
|
||||
///////////////////////////////////
|
||||
// Defaults sanity checks
|
||||
|
||||
{
|
||||
desc: "default limits",
|
||||
args: []string{
|
||||
`-data-dir=` + dataDir,
|
||||
},
|
||||
patch: func(rt *RuntimeConfig) {
|
||||
rt.DataDir = dataDir
|
||||
// Note that in the happy case this test will pass even if you comment
|
||||
// out all the stuff below since rt is also initialized from the
|
||||
// defaults. But it's still valuable as it will fail as soon as the
|
||||
// defaults are changed from these values forcing that change to be
|
||||
// intentional.
|
||||
rt.RPCHandshakeTimeout = 5 * time.Second
|
||||
rt.HTTPSHandshakeTimeout = 5 * time.Second
|
||||
rt.HTTPMaxConnsPerClient = 100
|
||||
rt.RPCMaxConnsPerClient = 100
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testConfig(t, tests, dataDir)
|
||||
|
@ -3871,8 +3893,12 @@ func TestFullConfig(t *testing.T) {
|
|||
"key_file": "IEkkwgIA",
|
||||
"leave_on_terminate": true,
|
||||
"limits": {
|
||||
"http_max_conns_per_client": 9283,
|
||||
"https_handshake_timeout": "2391ms",
|
||||
"rpc_handshake_timeout": "1932ms",
|
||||
"rpc_rate": 12029.43,
|
||||
"rpc_max_burst": 44848,
|
||||
"rpc_max_conns_per_client": 2954,
|
||||
"kv_max_value_size": 1234567800000000
|
||||
},
|
||||
"log_level": "k1zo9Spt",
|
||||
|
@ -4477,8 +4503,12 @@ func TestFullConfig(t *testing.T) {
|
|||
key_file = "IEkkwgIA"
|
||||
leave_on_terminate = true
|
||||
limits {
|
||||
http_max_conns_per_client = 9283
|
||||
https_handshake_timeout = "2391ms"
|
||||
rpc_handshake_timeout = "1932ms"
|
||||
rpc_rate = 12029.43
|
||||
rpc_max_burst = 44848
|
||||
rpc_max_conns_per_client = 2954
|
||||
kv_max_value_size = 1234567800000000
|
||||
}
|
||||
log_level = "k1zo9Spt"
|
||||
|
@ -5170,6 +5200,8 @@ func TestFullConfig(t *testing.T) {
|
|||
HTTPPort: 7999,
|
||||
HTTPResponseHeaders: map[string]string{"M6TKa9NP": "xjuxjOzQ", "JRCrHZed": "rl0mTx81"},
|
||||
HTTPSAddrs: []net.Addr{tcpAddr("95.17.17.19:15127")},
|
||||
HTTPMaxConnsPerClient: 9283,
|
||||
HTTPSHandshakeTimeout: 2391 * time.Millisecond,
|
||||
HTTPSPort: 15127,
|
||||
KeyFile: "IEkkwgIA",
|
||||
KVMaxValueSize: 1234567800000000,
|
||||
|
@ -5186,10 +5218,12 @@ func TestFullConfig(t *testing.T) {
|
|||
PrimaryDatacenter: "ejtmd43d",
|
||||
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
|
||||
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
|
||||
RPCHandshakeTimeout: 1932 * time.Millisecond,
|
||||
RPCHoldTimeout: 15707 * time.Second,
|
||||
RPCProtocol: 30793,
|
||||
RPCRateLimit: 12029.43,
|
||||
RPCMaxBurst: 44848,
|
||||
RPCMaxConnsPerClient: 2954,
|
||||
RaftProtocol: 19016,
|
||||
RaftSnapshotThreshold: 16384,
|
||||
RaftSnapshotInterval: 30 * time.Second,
|
||||
|
@ -6039,9 +6073,11 @@ func TestSanitize(t *testing.T) {
|
|||
"unix:///var/run/foo"
|
||||
],
|
||||
"HTTPBlockEndpoints": [],
|
||||
"HTTPMaxConnsPerClient": 0,
|
||||
"HTTPPort": 0,
|
||||
"HTTPResponseHeaders": {},
|
||||
"HTTPSAddrs": [],
|
||||
"HTTPSHandshakeTimeout": "0s",
|
||||
"HTTPSPort": 0,
|
||||
"KeyFile": "hidden",
|
||||
"KVMaxValueSize": 1234567800000000,
|
||||
|
@ -6062,8 +6098,10 @@ func TestSanitize(t *testing.T) {
|
|||
"PrimaryDatacenter": "",
|
||||
"RPCAdvertiseAddr": "",
|
||||
"RPCBindAddr": "",
|
||||
"RPCHandshakeTimeout": "0s",
|
||||
"RPCHoldTimeout": "0s",
|
||||
"RPCMaxBurst": 0,
|
||||
"RPCMaxConnsPerClient": 0,
|
||||
"RPCProtocol": 0,
|
||||
"RPCRateLimit": 0,
|
||||
"RaftProtocol": 0,
|
||||
|
|
|
@ -388,6 +388,12 @@ type Config struct {
|
|||
// CheckOutputMaxSize control the max size of output of checks
|
||||
CheckOutputMaxSize int
|
||||
|
||||
// RPCHandshakeTimeout limits how long we will wait for the initial magic byte
|
||||
// on an RPC client connection. It also governs how long we will wait for a
|
||||
// TLS handshake when TLS is configured however the timout applies separately
|
||||
// for the initial magic byte and the TLS handshake and inner magic byte.
|
||||
RPCHandshakeTimeout time.Duration
|
||||
|
||||
// RPCHoldTimeout is how long an RPC can be "held" before it is errored.
|
||||
// This is used to paper over a loss of leadership by instead holding RPCs,
|
||||
// so that the caller experiences a slow response rather than an error.
|
||||
|
@ -406,6 +412,10 @@ type Config struct {
|
|||
RPCRate rate.Limit
|
||||
RPCMaxBurst int
|
||||
|
||||
// RPCMaxConnsPerClient is the limit of how many concurrent connections are
|
||||
// allowed from a single source IP.
|
||||
RPCMaxConnsPerClient int
|
||||
|
||||
// LeaveDrainTime is used to wait after a server has left the LAN Serf
|
||||
// pool for RPCs to drain and new requests to be sent to other servers.
|
||||
LeaveDrainTime time.Duration
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
connlimit "github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-raftchunking"
|
||||
|
@ -64,6 +65,16 @@ func (s *Server) listen(listener net.Listener) {
|
|||
continue
|
||||
}
|
||||
|
||||
free, err := s.rpcConnLimiter.Accept(conn)
|
||||
if err != nil {
|
||||
s.rpcLogger().Error("rejecting RPC conn from %s"+
|
||||
" rpc_max_conns_per_client exceeded", conn.RemoteAddr().String())
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
// Wrap conn so it will be auto-freed from conn limiter when it closes.
|
||||
conn = connlimit.Wrap(conn, free)
|
||||
|
||||
go s.handleConn(conn, false)
|
||||
metrics.IncrCounter([]string{"rpc", "accept_conn"}, 1)
|
||||
}
|
||||
|
@ -78,8 +89,17 @@ func logConn(conn net.Conn) string {
|
|||
// handleConn is used to determine if this is a Raft or
|
||||
// Consul type RPC connection and invoke the correct handler
|
||||
func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
||||
|
||||
// Read a single byte
|
||||
buf := make([]byte, 1)
|
||||
|
||||
// Limit how long the client can hold the connection open before they send the
|
||||
// magic byte (and authenticate when mTLS is enabled). If `isTLS == true` then
|
||||
// this also enforces a timeout on how long it takes for the handshake to
|
||||
// complete since tls.Conn.Read implicitly calls Handshake().
|
||||
if s.config.RPCHandshakeTimeout > 0 {
|
||||
conn.SetReadDeadline(time.Now().Add(s.config.RPCHandshakeTimeout))
|
||||
}
|
||||
if _, err := conn.Read(buf); err != nil {
|
||||
if err != io.EOF {
|
||||
s.rpcLogger().Error("failed to read byte",
|
||||
|
@ -92,6 +112,12 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
|||
}
|
||||
typ := pool.RPCType(buf[0])
|
||||
|
||||
// Reset the deadline as we aren't sure what is expected next - it depends on
|
||||
// the protocol.
|
||||
if s.config.RPCHandshakeTimeout > 0 {
|
||||
conn.SetReadDeadline(time.Time{})
|
||||
}
|
||||
|
||||
// Enforce TLS if VerifyIncoming is set
|
||||
if s.tlsConfigurator.VerifyIncomingRPC() && !isTLS && typ != pool.RPCTLS && typ != pool.RPCTLSInsecure {
|
||||
s.rpcLogger().Warn("Non-TLS connection attempted with VerifyIncoming set", "conn", logConn(conn))
|
||||
|
@ -109,6 +135,12 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
|||
s.raftLayer.Handoff(conn)
|
||||
|
||||
case pool.RPCTLS:
|
||||
// Don't allow malicious client to create TLS-in-TLS for ever.
|
||||
if isTLS {
|
||||
s.rpcLogger().Error("TLS connection attempting to establish inner TLS connection %s", logConn(conn))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
conn = tls.Server(conn, s.tlsConfigurator.IncomingRPCConfig())
|
||||
s.handleConn(conn, true)
|
||||
|
||||
|
@ -119,6 +151,12 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
|||
s.handleSnapshotConn(conn)
|
||||
|
||||
case pool.RPCTLSInsecure:
|
||||
// Don't allow malicious client to create TLS-in-TLS for ever.
|
||||
if isTLS {
|
||||
s.rpcLogger().Error("TLS connection attempting to establish inner TLS connection %s", logConn(conn))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
conn = tls.Server(conn, s.tlsConfigurator.IncomingInsecureRPCConfig())
|
||||
s.handleInsecureConn(conn)
|
||||
|
||||
|
|
|
@ -2,16 +2,19 @@ package consul
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -315,3 +318,331 @@ func TestRPC_ReadyForConsistentReads(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRPC_MagicByteTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.RPCHandshakeTimeout = 10 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Connect to the server with bare TCP to simulate a malicious client trying
|
||||
// to hold open resources.
|
||||
addr := s1.config.RPCAdvertise
|
||||
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// Wait for more than the timeout. This is timing dependent so could fail if
|
||||
// the CPU is super overloaded so the handler goroutine so I'm using a retry
|
||||
// loop below to be sure but this feels like a pretty generous margin for
|
||||
// error (10x the timeout and 100ms of scheduling time).
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Set a read deadline on the Conn in case the timeout is not working we don't
|
||||
// want the read below to block forever. Needs to be much longer than what we
|
||||
// expect and the error should be different too.
|
||||
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Sanity check the conn was closed by attempting to read from it (a write
|
||||
// might not detect the close).
|
||||
buf := make([]byte, 10)
|
||||
_, err = conn.Read(buf)
|
||||
require.Error(r, err)
|
||||
require.Contains(r, err.Error(), "EOF")
|
||||
})
|
||||
}
|
||||
|
||||
func TestRPC_TLSHandshakeTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.RPCHandshakeTimeout = 10 * time.Millisecond
|
||||
c.UseTLS = true
|
||||
c.CAFile = "../../test/hostname/CertAuth.crt"
|
||||
c.CertFile = "../../test/hostname/Alice.crt"
|
||||
c.KeyFile = "../../test/hostname/Alice.key"
|
||||
c.VerifyServerHostname = true
|
||||
c.VerifyOutgoing = true
|
||||
c.VerifyIncoming = true
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Connect to the server with TLS magic byte delivered on time
|
||||
addr := s1.config.RPCAdvertise
|
||||
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// Write TLS byte to avoid being closed by either the (outer) first byte
|
||||
// timeout or the fact that server requires TLS
|
||||
_, err = conn.Write([]byte{pool.RPCTLS})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for more than the timeout before we start a TLS handshake. This is
|
||||
// timing dependent so could fail if the CPU is super overloaded so the
|
||||
// handler goroutine so I'm using a retry loop below to be sure but this feels
|
||||
// like a pretty generous margin for error (10x the timeout and 100ms of
|
||||
// scheduling time).
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Set a read deadline on the Conn in case the timeout is not working we don't
|
||||
// want the read below to block forever. Needs to be much longer than what we
|
||||
// expect and the error should be different too.
|
||||
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Sanity check the conn was closed by attempting to read from it (a write
|
||||
// might not detect the close).
|
||||
buf := make([]byte, 10)
|
||||
_, err = conn.Read(buf)
|
||||
require.Error(r, err)
|
||||
require.Contains(r, err.Error(), "EOF")
|
||||
})
|
||||
}
|
||||
|
||||
func TestRPC_PreventsTLSNesting(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
outerByte pool.RPCType
|
||||
innerByte pool.RPCType
|
||||
wantClose bool
|
||||
}{
|
||||
{
|
||||
// Base case, sanity check normal RPC in TLS works
|
||||
name: "RPC in TLS",
|
||||
outerByte: pool.RPCTLS,
|
||||
innerByte: pool.RPCConsul,
|
||||
wantClose: false,
|
||||
},
|
||||
{
|
||||
// Nested TLS-in-TLS
|
||||
name: "TLS in TLS",
|
||||
outerByte: pool.RPCTLS,
|
||||
innerByte: pool.RPCTLS,
|
||||
wantClose: true,
|
||||
},
|
||||
{
|
||||
// Nested TLS-in-TLS
|
||||
name: "TLS in Insecure TLS",
|
||||
outerByte: pool.RPCTLSInsecure,
|
||||
innerByte: pool.RPCTLS,
|
||||
wantClose: true,
|
||||
},
|
||||
{
|
||||
// Nested TLS-in-TLS
|
||||
name: "Insecure TLS in TLS",
|
||||
outerByte: pool.RPCTLS,
|
||||
innerByte: pool.RPCTLSInsecure,
|
||||
wantClose: true,
|
||||
},
|
||||
{
|
||||
// Nested TLS-in-TLS
|
||||
name: "Insecure TLS in Insecure TLS",
|
||||
outerByte: pool.RPCTLSInsecure,
|
||||
innerByte: pool.RPCTLSInsecure,
|
||||
wantClose: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.UseTLS = true
|
||||
c.CAFile = "../../test/hostname/CertAuth.crt"
|
||||
c.CertFile = "../../test/hostname/Alice.crt"
|
||||
c.KeyFile = "../../test/hostname/Alice.key"
|
||||
c.VerifyServerHostname = true
|
||||
c.VerifyOutgoing = true
|
||||
c.VerifyIncoming = false // saves us getting client cert setup
|
||||
c.Domain = "consul"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Connect to the server with TLS magic byte delivered on time
|
||||
addr := s1.config.RPCAdvertise
|
||||
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// Write Outer magic byte
|
||||
_, err = conn.Write([]byte{byte(tc.outerByte)})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start tls client
|
||||
tlsWrap := s1.tlsConfigurator.OutgoingRPCWrapper()
|
||||
tlsConn, err := tlsWrap("dc1", conn)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write Inner magic byte
|
||||
_, err = tlsConn.Write([]byte{byte(tc.innerByte)})
|
||||
require.NoError(t, err)
|
||||
|
||||
if tc.wantClose {
|
||||
// Allow up to a second for a read failure to indicate conn was closed by
|
||||
// server.
|
||||
conn.SetReadDeadline(time.Now().Add(1 * time.Second))
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Sanity check the conn was closed by attempting to read from it (a
|
||||
// write might not detect the close).
|
||||
buf := make([]byte, 10)
|
||||
_, err = tlsConn.Read(buf)
|
||||
require.Error(r, err)
|
||||
require.Contains(r, err.Error(), "EOF")
|
||||
})
|
||||
} else {
|
||||
// Set a shorter read deadline that should typically be enough to detect
|
||||
// immediate close but will also not make test hang forever. This
|
||||
// positive case is mostly just a sanity check that the test code here
|
||||
// is actually not failing just due to some other error in the way we
|
||||
// setup TLS. It also sanity checks that we still allow valid TLS conns
|
||||
// but if it produces possible false-positives in CI sometimes that's
|
||||
// not such a huge deal - CI won't be brittle and it will have done it's
|
||||
// job as a sanity check most of the time.
|
||||
conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
|
||||
buf := make([]byte, 10)
|
||||
_, err = tlsConn.Read(buf)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "i/o timeout")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func connectClient(t *testing.T, s1 *Server, mb pool.RPCType, useTLS, wantOpen bool, message string) net.Conn {
|
||||
t.Helper()
|
||||
|
||||
addr := s1.config.RPCAdvertise
|
||||
tlsWrap := s1.tlsConfigurator.OutgoingRPCWrapper()
|
||||
|
||||
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write magic byte so we aren't timed out
|
||||
outerByte := mb
|
||||
if useTLS {
|
||||
outerByte = pool.RPCTLS
|
||||
}
|
||||
_, err = conn.Write([]byte{byte(outerByte)})
|
||||
require.NoError(t, err)
|
||||
|
||||
if useTLS {
|
||||
tlsConn, err := tlsWrap(s1.config.Datacenter, conn)
|
||||
// Subtly, tlsWrap will NOT actually do a handshake in this case - it only
|
||||
// does so for some configs, so even if the server closed the conn before
|
||||
// handshake this won't fail and it's only when we attempt to read or write
|
||||
// that we'll see the broken pipe.
|
||||
require.NoError(t, err, "%s: wanted open conn, failed TLS handshake: %s",
|
||||
message, err)
|
||||
conn = tlsConn
|
||||
|
||||
// Write Inner magic byte
|
||||
_, err = conn.Write([]byte{byte(mb)})
|
||||
if !wantOpen {
|
||||
// TLS Handshake will be done on this attempt to write and should fail
|
||||
require.Error(t, err, "%s: wanted closed conn, TLS Handshake succeeded", message)
|
||||
} else {
|
||||
require.NoError(t, err, "%s: wanted open conn, failed writing inner magic byte: %s",
|
||||
message, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the conn is in the state we want.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Don't wait around as server won't be sending data but the read will fail
|
||||
// immediately if the conn is closed.
|
||||
conn.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
|
||||
buf := make([]byte, 10)
|
||||
_, err := conn.Read(buf)
|
||||
require.Error(r, err)
|
||||
if wantOpen {
|
||||
require.Contains(r, err.Error(), "i/o timeout",
|
||||
"%s: wanted an open conn (read timeout)", message)
|
||||
} else {
|
||||
if useTLS {
|
||||
require.Error(r, err)
|
||||
// TLS may fail during either read or write of the handshake so there
|
||||
// are a few different errors that come up.
|
||||
if !strings.Contains(err.Error(), "read: connection reset by peer") &&
|
||||
!strings.Contains(err.Error(), "write: connection reset by peer") &&
|
||||
!strings.Contains(err.Error(), "write: broken pipe") {
|
||||
r.Fatalf("%s: wanted closed conn got err: %s", message, err)
|
||||
}
|
||||
} else {
|
||||
require.Contains(r, err.Error(), "EOF", "%s: wanted a closed conn",
|
||||
message)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
func TestRPC_RPCMaxConnsPerClient(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
magicByte pool.RPCType
|
||||
tlsEnabled bool
|
||||
}{
|
||||
{"RPC", pool.RPCMultiplexV2, false},
|
||||
{"RPC TLS", pool.RPCMultiplexV2, true},
|
||||
{"Raft", pool.RPCRaft, false},
|
||||
{"Raft TLS", pool.RPCRaft, true},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.RPCMaxConnsPerClient = 2
|
||||
if tc.tlsEnabled {
|
||||
c.UseTLS = true
|
||||
c.CAFile = "../../test/hostname/CertAuth.crt"
|
||||
c.CertFile = "../../test/hostname/Alice.crt"
|
||||
c.KeyFile = "../../test/hostname/Alice.key"
|
||||
c.VerifyServerHostname = true
|
||||
c.VerifyOutgoing = true
|
||||
c.VerifyIncoming = false // saves us getting client cert setup
|
||||
c.Domain = "consul"
|
||||
}
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
// Connect to the server with bare TCP
|
||||
conn1 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn1")
|
||||
defer conn1.Close()
|
||||
|
||||
// Two conns should succeed
|
||||
conn2 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn2")
|
||||
defer conn2.Close()
|
||||
|
||||
// Third should be closed byt the limiter
|
||||
conn3 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, false, "conn3")
|
||||
defer conn3.Close()
|
||||
|
||||
// If we close one of the earlier ones, we should be able to open another
|
||||
conn1.Close()
|
||||
conn4 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn4")
|
||||
defer conn4.Close()
|
||||
|
||||
// Reload config with higher limit
|
||||
newCfg := *s1.config
|
||||
newCfg.RPCMaxConnsPerClient = 10
|
||||
require.NoError(t, s1.ReloadConfig(&newCfg))
|
||||
|
||||
// Now another conn should be allowed
|
||||
conn5 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn5")
|
||||
defer conn5.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
connlimit "github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/raft"
|
||||
|
@ -206,6 +207,9 @@ type Server struct {
|
|||
// from an agent.
|
||||
rpcLimiter atomic.Value
|
||||
|
||||
// rpcConnLimiter limits the number of RPC connections from a single source IP
|
||||
rpcConnLimiter connlimit.Limiter
|
||||
|
||||
// Listener is used to listen for incoming connections
|
||||
Listener net.Listener
|
||||
rpcServer *rpc.Server
|
||||
|
@ -749,6 +753,10 @@ func registerEndpoint(fn factory) {
|
|||
|
||||
// setupRPC is used to setup the RPC listener
|
||||
func (s *Server) setupRPC() error {
|
||||
s.rpcConnLimiter.SetConfig(connlimit.Config{
|
||||
MaxConnsPerClientIP: s.config.RPCMaxConnsPerClient,
|
||||
})
|
||||
|
||||
for _, fn := range endpoints {
|
||||
s.rpcServer.Register(fn(s))
|
||||
}
|
||||
|
@ -1258,6 +1266,9 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
|
|||
// relevant configuration information
|
||||
func (s *Server) ReloadConfig(config *Config) error {
|
||||
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
|
||||
s.rpcConnLimiter.SetConfig(connlimit.Config{
|
||||
MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
|
||||
})
|
||||
|
||||
if s.IsLeader() {
|
||||
// only bootstrap the config entries if we are the leader
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
tokenStore "github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -1252,3 +1253,139 @@ func jsonReader(v interface{}) io.Reader {
|
|||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func TestHTTPServer_HandshakeTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Fire up an agent with TLS enabled.
|
||||
a := NewTestAgentWithFields(t, true, TestAgent{
|
||||
UseTLS: true,
|
||||
HCL: `
|
||||
key_file = "../test/client_certs/server.key"
|
||||
cert_file = "../test/client_certs/server.crt"
|
||||
ca_file = "../test/client_certs/rootca.crt"
|
||||
|
||||
limits {
|
||||
https_handshake_timeout = "10ms"
|
||||
}
|
||||
`,
|
||||
})
|
||||
defer a.Shutdown()
|
||||
|
||||
// Connect to it with a plain TCP client that doesn't attempt to send HTTP or
|
||||
// complete a TLS handshake.
|
||||
conn, err := net.Dial("tcp", a.srv.ln.Addr().String())
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
// Wait for more than the timeout. This is timing dependent so could fail if
|
||||
// the CPU is super overloaded so the handler goroutine so I'm using a retry
|
||||
// loop below to be sure but this feels like a pretty generous margin for
|
||||
// error (10x the timeout and 100ms of scheduling time).
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Set a read deadline on the Conn in case the timeout is not working we don't
|
||||
// want the read below to block forever. Needs to be much longer than what we
|
||||
// expect and the error should be different too.
|
||||
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Sanity check the conn was closed by attempting to read from it (a write
|
||||
// might not detect the close).
|
||||
buf := make([]byte, 10)
|
||||
_, err = conn.Read(buf)
|
||||
require.Error(r, err)
|
||||
require.Contains(r, err.Error(), "EOF")
|
||||
})
|
||||
}
|
||||
|
||||
func TestRPC_HTTPSMaxConnsPerClient(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
tlsEnabled bool
|
||||
}{
|
||||
{"HTTP", false},
|
||||
{"HTTPS", true},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
hclPrefix := ""
|
||||
if tc.tlsEnabled {
|
||||
hclPrefix = `
|
||||
key_file = "../test/client_certs/server.key"
|
||||
cert_file = "../test/client_certs/server.crt"
|
||||
ca_file = "../test/client_certs/rootca.crt"
|
||||
`
|
||||
}
|
||||
|
||||
// Fire up an agent with TLS enabled.
|
||||
a := NewTestAgentWithFields(t, true, TestAgent{
|
||||
UseTLS: tc.tlsEnabled,
|
||||
HCL: hclPrefix + `
|
||||
limits {
|
||||
http_max_conns_per_client = 2
|
||||
}
|
||||
`,
|
||||
})
|
||||
defer a.Shutdown()
|
||||
|
||||
addr := a.srv.ln.Addr()
|
||||
|
||||
assertConn := func(conn net.Conn, wantOpen bool) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Don't wait around as server won't be sending data but the read will fail
|
||||
// immediately if the conn is closed.
|
||||
conn.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
|
||||
buf := make([]byte, 10)
|
||||
_, err := conn.Read(buf)
|
||||
require.Error(r, err)
|
||||
if wantOpen {
|
||||
require.Contains(r, err.Error(), "i/o timeout",
|
||||
"wanted an open conn (read timeout)")
|
||||
} else {
|
||||
require.Contains(r, err.Error(), "EOF", "wanted a closed conn")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Connect to the server with bare TCP
|
||||
conn1, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
defer conn1.Close()
|
||||
|
||||
assertConn(conn1, true)
|
||||
|
||||
// Two conns should succeed
|
||||
conn2, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
defer conn2.Close()
|
||||
|
||||
assertConn(conn2, true)
|
||||
|
||||
// Third should succeed negotiating TCP handshake...
|
||||
conn3, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
defer conn3.Close()
|
||||
|
||||
// But then be closed.
|
||||
assertConn(conn3, false)
|
||||
|
||||
// Reload config with higher limit
|
||||
newCfg := *a.config
|
||||
newCfg.HTTPMaxConnsPerClient = 10
|
||||
require.NoError(t, a.ReloadConfig(&newCfg))
|
||||
|
||||
// Now another conn should be allowed
|
||||
conn4, err := net.DialTimeout("tcp", addr.String(), time.Second)
|
||||
require.NoError(t, err)
|
||||
defer conn4.Close()
|
||||
|
||||
assertConn(conn4, true)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -32,6 +32,7 @@ require (
|
|||
github.com/hashicorp/go-bexpr v0.1.2
|
||||
github.com/hashicorp/go-checkpoint v0.0.0-20171009173528-1545e56e46de
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1
|
||||
github.com/hashicorp/go-connlimit v0.1.0
|
||||
github.com/hashicorp/go-discover v0.0.0-20191202160150-7ec2cfbda7a2
|
||||
github.com/hashicorp/go-hclog v0.12.0
|
||||
github.com/hashicorp/go-memdb v1.0.3
|
||||
|
@ -70,6 +71,7 @@ require (
|
|||
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880
|
||||
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
|
||||
github.com/spf13/pflag v1.0.3 // indirect
|
||||
github.com/stretchr/objx v0.1.1 // indirect
|
||||
github.com/stretchr/testify v1.4.0
|
||||
golang.org/x/crypto v0.0.0-20191106202628-ed6320f186d4
|
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478
|
||||
|
|
6
go.sum
6
go.sum
|
@ -122,6 +122,8 @@ github.com/hashicorp/go-checkpoint v0.0.0-20171009173528-1545e56e46de/go.mod h1:
|
|||
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
github.com/hashicorp/go-connlimit v0.1.0 h1:j8XwYaCBgm7n82jaQpaUeaDlsJH1x5/ycAJhlkrXv7E=
|
||||
github.com/hashicorp/go-connlimit v0.1.0/go.mod h1:OUj9FGL1tPIhl/2RCfzYHrIiWj+VVPGNyVPnUX8AqS0=
|
||||
github.com/hashicorp/go-discover v0.0.0-20191202160150-7ec2cfbda7a2 h1:r7GtRT+VXoM5WqHMxSVDIKgVCfK9T8CoS51RDKeOjBM=
|
||||
github.com/hashicorp/go-discover v0.0.0-20191202160150-7ec2cfbda7a2/go.mod h1:NnH5X4UCBEBdTuK2L8s4e4ilJm3UmGX0bANHCz0HSs0=
|
||||
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
|
||||
|
@ -345,6 +347,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
|
|||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||
|
@ -474,6 +478,8 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep
|
|||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
# Go Server Client Connection Tracking
|
||||
|
||||
This package provides a library for network servers to track how many
|
||||
concurrent connections they have from a given client address.
|
||||
|
||||
It's designed to be very simple and shared between several HashiCorp products
|
||||
that provide network servers and need this kind of control to impose limits on
|
||||
the resources that can be consumed by a single client.
|
||||
|
||||
## Usage
|
||||
|
||||
### TCP Server
|
||||
|
||||
```
|
||||
// During server setup:
|
||||
s.limiter = NewLimiter(Config{
|
||||
MaxConnsPerClientIP: 10,
|
||||
})
|
||||
|
||||
```
|
||||
|
||||
```
|
||||
// handleConn is called in its own goroutine for each net.Conn accepted by
|
||||
// a net.Listener.
|
||||
func (s *Server) handleConn(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
// Track the connection
|
||||
free, err := s.limiter.Accept(conn)
|
||||
if err != nil {
|
||||
// Not accepted as limit has been reached (or some other error), log error
|
||||
// or warning and close.
|
||||
|
||||
// The standard err.Error() message when limit is reached is generic so it
|
||||
// doesn't leak information which may potentially be sensitive (e.g. current
|
||||
// limits set or number of connections). This also allows comparison to
|
||||
// ErrPerClientIPLimitReached if it's important to handle it differently
|
||||
// from an internal library or io error (currently not possible but might be
|
||||
// in the future if additional functionality is added).
|
||||
|
||||
// If you would like to log more information about the current limit that
|
||||
// can be obtained with s.limiter.Config().
|
||||
return
|
||||
}
|
||||
// Defer a call to free to decrement the counter for this client IP once we
|
||||
// are done with this conn.
|
||||
defer free()
|
||||
|
||||
|
||||
// Handle the conn
|
||||
}
|
||||
```
|
||||
|
||||
### HTTP Server
|
||||
|
||||
```
|
||||
lim := NewLimiter(Config{
|
||||
MaxConnsPerClientIP: 10,
|
||||
})
|
||||
s := http.Server{
|
||||
// Other config here
|
||||
ConnState: lim.HTTPConnStateFunc(),
|
||||
}
|
||||
```
|
||||
|
||||
### Dynamic Configuration
|
||||
|
||||
The limiter supports dynamic reconfiguration. At any time, any goroutine may
|
||||
call `limiter.SetConfig(c Config)` which will atomically update the config. All
|
||||
subsequent calls to `Accept` will use the newly configured limits in their
|
||||
decisions and calls to `limiter.Config()` will return the new config.
|
||||
|
||||
Note that if the limits are reduced that will only prevent further connections
|
||||
beyond the new limit - existing connections are not actively closed to meet the
|
||||
limit. In cases where this is critical it's often preferable to mitigate in a
|
||||
more focussed way e.g. by adding an iptables rule that blocks all connections
|
||||
from one malicious client without affecting the whole server.
|
|
@ -0,0 +1,180 @@
|
|||
package connlimit
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrPerClientIPLimitReached is returned if accepting a new conn would exceed
|
||||
// the per-client-ip limit set.
|
||||
ErrPerClientIPLimitReached = errors.New("client connection limit reached")
|
||||
)
|
||||
|
||||
// Limiter implements a simple limiter that tracks the number of connections
|
||||
// from each client IP. It may be used in it's zero value although no limits
|
||||
// will be configured initially - they can be set later with SetConfig.
|
||||
type Limiter struct {
|
||||
// cs stores the map of active connections by IP address. We store a set of
|
||||
// conn pointers not just a counter because http.Server.ConnState hook only
|
||||
// gives us a connection object between calls so we need to know if a closed
|
||||
// conn is one that was previously accepted or one we've just closed in the
|
||||
// ConnState hook because the client has hit its limit.
|
||||
cs map[string]map[net.Conn]struct{}
|
||||
|
||||
// l protects access to cs
|
||||
l sync.Mutex
|
||||
|
||||
// cfg is stored atomically to provide non-blocking reads via Config. This
|
||||
// might be important if this is called regularly in a health or metrics
|
||||
// endpoint and shouldn't block new connections being established.
|
||||
cfg atomic.Value
|
||||
}
|
||||
|
||||
// Config is the configuration for the limiter.
|
||||
type Config struct {
|
||||
// MaxConnsPerClientIP limits how many concurrent connections are allowed from
|
||||
// a given client IP. The IP is the one reported by the connection so cannot
|
||||
// be relied upon if clients are connecting through multiple proxies or able
|
||||
// to spoof their source IP address in some way. Similarly, multiple clients
|
||||
// connected via a proxy or NAT gateway or similar will all be seen as coming
|
||||
// from the same IP and so limited as one client.
|
||||
MaxConnsPerClientIP int
|
||||
}
|
||||
|
||||
// NewLimiter returns a limiter with the specified config.
|
||||
func NewLimiter(cfg Config) *Limiter {
|
||||
l := &Limiter{}
|
||||
l.SetConfig(cfg)
|
||||
return l
|
||||
}
|
||||
|
||||
// Accept is called as early as possible when handling a new conn. If the
|
||||
// connection should be accepted according to the Limiter's Config, it will
|
||||
// return a free func and nil error. The free func must be called when the
|
||||
// connection is no longer being handled - typically in a defer statement in the
|
||||
// main connection handling goroutine, this will decrement the counter for that
|
||||
// client IP. If the configured limit has been reached, a no-op func is returned
|
||||
// (doesn't need to be called), and ErrPerClientIPLimitReached is returned.
|
||||
//
|
||||
// If any other error is returned it signifies something wrong with the config
|
||||
// or transient failure to read or parse the remote IP. The free func will be a
|
||||
// no-op in this case and need not be called.
|
||||
func (l *Limiter) Accept(conn net.Conn) (func(), error) {
|
||||
addrKey := addrKey(conn)
|
||||
|
||||
// Load config outside locked section since it's not updated under lock anyway
|
||||
// and the atomic Load might be slower/contented so better to do outside lock.
|
||||
cfg := l.Config()
|
||||
|
||||
l.l.Lock()
|
||||
defer l.l.Unlock()
|
||||
|
||||
if l.cs == nil {
|
||||
l.cs = make(map[string]map[net.Conn]struct{})
|
||||
}
|
||||
|
||||
cs := l.cs[addrKey]
|
||||
if cs == nil {
|
||||
cs = make(map[net.Conn]struct{})
|
||||
l.cs[addrKey] = cs
|
||||
}
|
||||
|
||||
n := len(cs)
|
||||
|
||||
// Might be greater since config is dynamic.
|
||||
if cfg.MaxConnsPerClientIP > 0 && n >= cfg.MaxConnsPerClientIP {
|
||||
return func() {}, ErrPerClientIPLimitReached
|
||||
}
|
||||
|
||||
// Add the conn to the map
|
||||
cs[conn] = struct{}{}
|
||||
|
||||
// Create a free func over the address key we used
|
||||
free := func() {
|
||||
l.freeConn(conn)
|
||||
}
|
||||
|
||||
return free, nil
|
||||
}
|
||||
|
||||
func addrKey(conn net.Conn) string {
|
||||
addr := conn.RemoteAddr()
|
||||
switch a := addr.(type) {
|
||||
case *net.TCPAddr:
|
||||
return "ip:" + a.IP.String()
|
||||
case *net.UDPAddr:
|
||||
return "ip:" + a.IP.String()
|
||||
case *net.IPAddr:
|
||||
return "ip:" + a.IP.String()
|
||||
default:
|
||||
// not sure what to do with this, just assume whole Addr is relevant?
|
||||
return addr.Network() + "/" + addr.String()
|
||||
}
|
||||
}
|
||||
|
||||
// freeConn removes a connection from the map if it's present. It is a no-op if
|
||||
// the conn was never accepted by Accept.
|
||||
func (l *Limiter) freeConn(conn net.Conn) {
|
||||
addrKey := addrKey(conn)
|
||||
|
||||
l.l.Lock()
|
||||
defer l.l.Unlock()
|
||||
|
||||
cs, ok := l.cs[addrKey]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(cs, conn)
|
||||
if len(cs) == 0 {
|
||||
delete(l.cs, addrKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Config returns the current limiter configuration. It is safe to call from any
|
||||
// goroutine and does not block new connections being accepted.
|
||||
func (l *Limiter) Config() Config {
|
||||
cfgRaw := l.cfg.Load()
|
||||
if cfg, ok := cfgRaw.(Config); ok {
|
||||
return cfg
|
||||
}
|
||||
return Config{}
|
||||
}
|
||||
|
||||
// SetConfig dynamically updates the limiter configuration. It is safe to call
|
||||
// from any goroutine. Note that if the limit is lowered, active conns will not
|
||||
// be closed and may remain over the limit until they close naturally.
|
||||
func (l *Limiter) SetConfig(c Config) {
|
||||
l.cfg.Store(c)
|
||||
}
|
||||
|
||||
// HTTPConnStateFunc returns a func that can be passed as the ConnState field of
|
||||
// an http.Server. This intercepts new HTTP connections to the server and
|
||||
// applies the limiting to new connections.
|
||||
//
|
||||
// Note that if the conn is hijacked from the HTTP server then it will be freed
|
||||
// in the limiter as if it was closed. Servers that use Hijacking must implement
|
||||
// their own calls if they need to continue limiting the number of concurrent
|
||||
// hijacked connections.
|
||||
func (l *Limiter) HTTPConnStateFunc() func(net.Conn, http.ConnState) {
|
||||
return func(conn net.Conn, state http.ConnState) {
|
||||
switch state {
|
||||
case http.StateNew:
|
||||
_, err := l.Accept(conn)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
}
|
||||
case http.StateHijacked:
|
||||
l.freeConn(conn)
|
||||
case http.StateClosed:
|
||||
// Maybe free the conn. This might be a conn we closed in the case above
|
||||
// that was never counted as it was over limit but freeConn will be a
|
||||
// no-op in that case.
|
||||
l.freeConn(conn)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
module github.com/hashicorp/go-connlimit
|
||||
|
||||
go 1.12
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/stretchr/testify v1.4.0
|
||||
)
|
|
@ -0,0 +1,13 @@
|
|||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|
@ -0,0 +1,27 @@
|
|||
package connlimit
|
||||
|
||||
import "net"
|
||||
|
||||
// WrappedConn wraps a net.Conn and free() func returned by Limiter.Accept so
|
||||
// that when the wrapped connections Close method is called, its free func is
|
||||
// also called.
|
||||
type WrappedConn struct {
|
||||
net.Conn
|
||||
free func()
|
||||
}
|
||||
|
||||
// Wrap wraps a net.Conn's Close method so free() is called when Close is
|
||||
// called. Useful when handing off tracked connections to libraries that close
|
||||
// them.
|
||||
func Wrap(conn net.Conn, free func()) net.Conn {
|
||||
return &WrappedConn{
|
||||
Conn: conn,
|
||||
free: free,
|
||||
}
|
||||
}
|
||||
|
||||
// Close frees the tracked connection and closes the underlying net.Conn.
|
||||
func (w *WrappedConn) Close() error {
|
||||
w.free()
|
||||
return w.Conn.Close()
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
engines:
|
||||
gofmt:
|
||||
enabled: true
|
||||
golint:
|
||||
enabled: true
|
||||
govet:
|
||||
enabled: true
|
||||
|
||||
exclude_patterns:
|
||||
- ".github/"
|
||||
- "vendor/"
|
||||
- "codegen/"
|
||||
- "doc.go"
|
|
@ -1,4 +1,11 @@
|
|||
/dep
|
||||
/testdep
|
||||
/profile.out
|
||||
/coverage.txt
|
||||
# Binaries for programs and plugins
|
||||
*.exe
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
|
||||
# Test binary, build with `go test -c`
|
||||
*.test
|
||||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
|
|
|
@ -4,10 +4,22 @@ go:
|
|||
- 1.9
|
||||
- tip
|
||||
|
||||
env:
|
||||
global:
|
||||
- CC_TEST_REPORTER_ID=68feaa3410049ce73e145287acbcdacc525087a30627f96f04e579e75bd71c00
|
||||
|
||||
before_script:
|
||||
- curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./cc-test-reporter
|
||||
- chmod +x ./cc-test-reporter
|
||||
- ./cc-test-reporter before-build
|
||||
|
||||
install:
|
||||
- go get github.com/go-task/task/cmd/task
|
||||
|
||||
script:
|
||||
- task dl-deps
|
||||
- task lint
|
||||
- task test
|
||||
- task test-coverage
|
||||
|
||||
after_script:
|
||||
- ./cc-test-reporter after-build --exit-code $TRAVIS_TEST_RESULT
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
|
||||
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/davecgh/go-spew"
|
||||
packages = ["spew"]
|
||||
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
|
||||
version = "v1.1.0"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/pmezard/go-difflib"
|
||||
packages = ["difflib"]
|
||||
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
|
||||
version = "v1.0.0"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/stretchr/testify"
|
||||
packages = [
|
||||
"assert",
|
||||
"require"
|
||||
]
|
||||
revision = "b91bfb9ebec76498946beb6af7c0230c7cc7ba6c"
|
||||
version = "v1.2.0"
|
||||
|
||||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "2d160a7dea4ffd13c6c31dab40373822f9d78c73beba016d662bef8f7a998876"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
|
@ -0,0 +1,8 @@
|
|||
[prune]
|
||||
unused-packages = true
|
||||
non-go = true
|
||||
go-tests = true
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/stretchr/testify"
|
||||
version = "~1.2.0"
|
|
@ -1,6 +1,8 @@
|
|||
# Objx
|
||||
[![Build Status](https://travis-ci.org/stretchr/objx.svg?branch=master)](https://travis-ci.org/stretchr/objx)
|
||||
[![Go Report Card](https://goreportcard.com/badge/github.com/stretchr/objx)](https://goreportcard.com/report/github.com/stretchr/objx)
|
||||
[![Maintainability](https://api.codeclimate.com/v1/badges/1d64bc6c8474c2074f2b/maintainability)](https://codeclimate.com/github/stretchr/objx/maintainability)
|
||||
[![Test Coverage](https://api.codeclimate.com/v1/badges/1d64bc6c8474c2074f2b/test_coverage)](https://codeclimate.com/github/stretchr/objx/test_coverage)
|
||||
[![Sourcegraph](https://sourcegraph.com/github.com/stretchr/objx/-/badge.svg)](https://sourcegraph.com/github.com/stretchr/objx)
|
||||
[![GoDoc](https://godoc.org/github.com/stretchr/objx?status.svg)](https://godoc.org/github.com/stretchr/objx)
|
||||
|
||||
|
|
|
@ -12,11 +12,12 @@ update-deps:
|
|||
cmds:
|
||||
- dep ensure
|
||||
- dep ensure -update
|
||||
- dep prune
|
||||
|
||||
lint:
|
||||
desc: Runs golint
|
||||
cmds:
|
||||
- go fmt $(go list ./... | grep -v /vendor/)
|
||||
- go vet $(go list ./... | grep -v /vendor/)
|
||||
- golint $(ls *.go | grep -v "doc.go")
|
||||
silent: true
|
||||
|
||||
|
@ -24,3 +25,8 @@ test:
|
|||
desc: Runs go tests
|
||||
cmds:
|
||||
- go test -race .
|
||||
|
||||
test-coverage:
|
||||
desc: Runs go tests and calucates test coverage
|
||||
cmds:
|
||||
- go test -coverprofile=c.out .
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package objx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -28,7 +27,7 @@ var arrayAccesRegex = regexp.MustCompile(arrayAccesRegexString)
|
|||
//
|
||||
// o.Get("books[1].chapters[2].title")
|
||||
func (m Map) Get(selector string) *Value {
|
||||
rawObj := access(m, selector, nil, false, false)
|
||||
rawObj := access(m, selector, nil, false)
|
||||
return &Value{data: rawObj}
|
||||
}
|
||||
|
||||
|
@ -43,34 +42,25 @@ func (m Map) Get(selector string) *Value {
|
|||
//
|
||||
// o.Set("books[1].chapters[2].title","Time to Go")
|
||||
func (m Map) Set(selector string, value interface{}) Map {
|
||||
access(m, selector, value, true, false)
|
||||
access(m, selector, value, true)
|
||||
return m
|
||||
}
|
||||
|
||||
// access accesses the object using the selector and performs the
|
||||
// appropriate action.
|
||||
func access(current, selector, value interface{}, isSet, panics bool) interface{} {
|
||||
|
||||
func access(current, selector, value interface{}, isSet bool) interface{} {
|
||||
switch selector.(type) {
|
||||
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
|
||||
|
||||
if array, ok := current.([]interface{}); ok {
|
||||
index := intFromInterface(selector)
|
||||
|
||||
if index >= len(array) {
|
||||
if panics {
|
||||
panic(fmt.Sprintf("objx: Index %d is out of range. Slice only contains %d items.", index, len(array)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return array[index]
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
case string:
|
||||
|
||||
selStr := selector.(string)
|
||||
selSegs := strings.SplitN(selStr, PathSeparator, 2)
|
||||
thisSel := selSegs[0]
|
||||
|
@ -79,7 +69,6 @@ func access(current, selector, value interface{}, isSet, panics bool) interface{
|
|||
|
||||
if strings.Contains(thisSel, "[") {
|
||||
arrayMatches := arrayAccesRegex.FindStringSubmatch(thisSel)
|
||||
|
||||
if len(arrayMatches) > 0 {
|
||||
// Get the key into the map
|
||||
thisSel = arrayMatches[1]
|
||||
|
@ -94,11 +83,9 @@ func access(current, selector, value interface{}, isSet, panics bool) interface{
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if curMap, ok := current.(Map); ok {
|
||||
current = map[string]interface{}(curMap)
|
||||
}
|
||||
|
||||
// get the object in question
|
||||
switch current.(type) {
|
||||
case map[string]interface{}:
|
||||
|
@ -111,29 +98,19 @@ func access(current, selector, value interface{}, isSet, panics bool) interface{
|
|||
default:
|
||||
current = nil
|
||||
}
|
||||
|
||||
if current == nil && panics {
|
||||
panic(fmt.Sprintf("objx: '%v' invalid on object.", selector))
|
||||
}
|
||||
|
||||
// do we need to access the item of an array?
|
||||
if index > -1 {
|
||||
if array, ok := current.([]interface{}); ok {
|
||||
if index < len(array) {
|
||||
current = array[index]
|
||||
} else {
|
||||
if panics {
|
||||
panic(fmt.Sprintf("objx: Index %d is out of range. Slice only contains %d items.", index, len(array)))
|
||||
}
|
||||
current = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(selSegs) > 1 {
|
||||
current = access(current, selSegs[1], value, isSet, panics)
|
||||
current = access(current, selSegs[1], value, isSet)
|
||||
}
|
||||
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
@ -165,7 +142,7 @@ func intFromInterface(selector interface{}) int {
|
|||
case uint64:
|
||||
value = int(selector.(uint64))
|
||||
default:
|
||||
panic("objx: array access argument is not an integer type (this should never happen)")
|
||||
return 0
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package objx
|
||||
|
||||
const (
|
||||
// PathSeparator is the character used to separate the elements
|
||||
// of the keypath.
|
||||
//
|
||||
// For example, `location.address.city`
|
||||
PathSeparator string = "."
|
||||
|
||||
// SignatureSeparator is the character that is used to
|
||||
// separate the Base64 string from the security signature.
|
||||
SignatureSeparator = "_"
|
||||
)
|
|
@ -47,9 +47,8 @@ func New(data interface{}) Map {
|
|||
//
|
||||
// The arguments follow a key, value pattern.
|
||||
//
|
||||
// Panics
|
||||
//
|
||||
// Panics if any key argument is non-string or if there are an odd number of arguments.
|
||||
// Returns nil if any key argument is non-string or if there are an odd number of arguments.
|
||||
//
|
||||
// Example
|
||||
//
|
||||
|
@ -58,14 +57,13 @@ func New(data interface{}) Map {
|
|||
// m := objx.MSI("name", "Mat", "age", 29, "subobj", objx.MSI("active", true))
|
||||
//
|
||||
// // creates an Map equivalent to
|
||||
// m := objx.New(map[string]interface{}{"name": "Mat", "age": 29, "subobj": map[string]interface{}{"active": true}})
|
||||
// m := objx.Map{"name": "Mat", "age": 29, "subobj": objx.Map{"active": true}}
|
||||
func MSI(keyAndValuePairs ...interface{}) Map {
|
||||
newMap := make(map[string]interface{})
|
||||
newMap := Map{}
|
||||
keyAndValuePairsLen := len(keyAndValuePairs)
|
||||
if keyAndValuePairsLen%2 != 0 {
|
||||
panic("objx: MSI must have an even number of arguments following the 'key, value' pattern.")
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < keyAndValuePairsLen; i = i + 2 {
|
||||
key := keyAndValuePairs[i]
|
||||
value := keyAndValuePairs[i+1]
|
||||
|
@ -73,11 +71,11 @@ func MSI(keyAndValuePairs ...interface{}) Map {
|
|||
// make sure the key is a string
|
||||
keyString, keyStringOK := key.(string)
|
||||
if !keyStringOK {
|
||||
panic("objx: MSI must follow 'string, interface{}' pattern. " + keyString + " is not a valid key.")
|
||||
return nil
|
||||
}
|
||||
newMap[keyString] = value
|
||||
}
|
||||
return New(newMap)
|
||||
return newMap
|
||||
}
|
||||
|
||||
// ****** Conversion Constructors
|
||||
|
@ -170,12 +168,11 @@ func FromURLQuery(query string) (Map, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := make(map[string]interface{})
|
||||
m := Map{}
|
||||
for k, vals := range vals {
|
||||
m[k] = vals[0]
|
||||
}
|
||||
return New(m), nil
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// MustFromURLQuery generates a new Obj by parsing the specified
|
||||
|
|
|
@ -5,14 +5,7 @@ package objx
|
|||
func (m Map) Exclude(exclude []string) Map {
|
||||
excluded := make(Map)
|
||||
for k, v := range m {
|
||||
var shouldInclude = true
|
||||
for _, toExclude := range exclude {
|
||||
if k == toExclude {
|
||||
shouldInclude = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if shouldInclude {
|
||||
if !contains(exclude, k) {
|
||||
excluded[k] = v
|
||||
}
|
||||
}
|
||||
|
@ -21,11 +14,11 @@ func (m Map) Exclude(exclude []string) Map {
|
|||
|
||||
// Copy creates a shallow copy of the Obj.
|
||||
func (m Map) Copy() Map {
|
||||
copied := make(map[string]interface{})
|
||||
copied := Map{}
|
||||
for k, v := range m {
|
||||
copied[k] = v
|
||||
}
|
||||
return New(copied)
|
||||
return copied
|
||||
}
|
||||
|
||||
// Merge blends the specified map with a copy of this map and returns the result.
|
||||
|
@ -52,12 +45,12 @@ func (m Map) MergeHere(merge Map) Map {
|
|||
// to change the keys and values as it goes. This method requires that
|
||||
// the wrapped object be a map[string]interface{}
|
||||
func (m Map) Transform(transformer func(key string, value interface{}) (string, interface{})) Map {
|
||||
newMap := make(map[string]interface{})
|
||||
newMap := Map{}
|
||||
for k, v := range m {
|
||||
modifiedKey, modifiedVal := transformer(k, v)
|
||||
newMap[modifiedKey] = modifiedVal
|
||||
}
|
||||
return New(newMap)
|
||||
return newMap
|
||||
}
|
||||
|
||||
// TransformKeys builds a new map using the specified key mapping.
|
||||
|
@ -72,3 +65,13 @@ func (m Map) TransformKeys(mapping map[string]string) Map {
|
|||
return key, value
|
||||
})
|
||||
}
|
||||
|
||||
// Checks if a string slice contains a string
|
||||
func contains(s []string, e string) bool {
|
||||
for _, a := range s {
|
||||
if a == e {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -5,13 +5,8 @@ import (
|
|||
"encoding/hex"
|
||||
)
|
||||
|
||||
// HashWithKey hashes the specified string using the security
|
||||
// key.
|
||||
// HashWithKey hashes the specified string using the security key
|
||||
func HashWithKey(data, key string) string {
|
||||
hash := sha1.New()
|
||||
_, err := hash.Write([]byte(data + ":" + key))
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return hex.EncodeToString(hash.Sum(nil))
|
||||
d := sha1.Sum([]byte(data + ":" + key))
|
||||
return hex.EncodeToString(d[:])
|
||||
}
|
||||
|
|
|
@ -30,8 +30,6 @@ func (v *Value) String() string {
|
|||
return strconv.FormatFloat(v.Float64(), 'f', -1, 64)
|
||||
case v.IsInt():
|
||||
return strconv.FormatInt(int64(v.Int()), 10)
|
||||
case v.IsInt():
|
||||
return strconv.FormatInt(int64(v.Int()), 10)
|
||||
case v.IsInt8():
|
||||
return strconv.FormatInt(int64(v.Int8()), 10)
|
||||
case v.IsInt16():
|
||||
|
@ -51,6 +49,5 @@ func (v *Value) String() string {
|
|||
case v.IsUint64():
|
||||
return strconv.FormatUint(v.Uint64(), 10)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%#v", v.Data())
|
||||
}
|
||||
|
|
|
@ -177,6 +177,8 @@ github.com/hashicorp/go-bexpr
|
|||
github.com/hashicorp/go-checkpoint
|
||||
# github.com/hashicorp/go-cleanhttp v0.5.1
|
||||
github.com/hashicorp/go-cleanhttp
|
||||
# github.com/hashicorp/go-connlimit v0.1.0
|
||||
github.com/hashicorp/go-connlimit
|
||||
# github.com/hashicorp/go-discover v0.0.0-20191202160150-7ec2cfbda7a2
|
||||
github.com/hashicorp/go-discover
|
||||
github.com/hashicorp/go-discover/provider/k8s
|
||||
|
@ -369,7 +371,7 @@ github.com/softlayer/softlayer-go/sl
|
|||
github.com/softlayer/softlayer-go/config
|
||||
# github.com/spf13/pflag v1.0.3
|
||||
github.com/spf13/pflag
|
||||
# github.com/stretchr/objx v0.1.0
|
||||
# github.com/stretchr/objx v0.1.1
|
||||
github.com/stretchr/objx
|
||||
# github.com/stretchr/testify v1.4.0
|
||||
github.com/stretchr/testify/mock
|
||||
|
|
|
@ -1369,18 +1369,67 @@ default will automatically work with some tooling.
|
|||
value was unconditionally set to `false`). On agents in client-mode, this defaults to `true`
|
||||
and for agents in server-mode, this defaults to `false`.
|
||||
|
||||
* <a name="limits"></a><a href="#limits">`limits`</a> Available in Consul 0.9.3 and later, this
|
||||
is a nested object that configures limits that are enforced by the agent. Currently, this only
|
||||
applies to agents in client mode, not Consul servers. The following parameters are available:
|
||||
* <a name="limits"></a><a href="#limits">`limits`</a> Available in Consul 0.9.3
|
||||
and later, this is a nested object that configures limits that are enforced by
|
||||
the agent. The following parameters are available:
|
||||
|
||||
* <a name="rpc_rate"></a><a href="#rpc_rate">`rpc_rate`</a> - Configures the RPC rate
|
||||
limiter by setting the maximum request rate that this agent is allowed to make for RPC
|
||||
requests to Consul servers, in requests per second. Defaults to infinite, which disables
|
||||
* <a name="http_max_conns_per_client"></a><a
|
||||
href="#http_max_conns_per_client">`http_max_conns_per_client`</a> -
|
||||
Configures a limit of how many concurrent TCP connections a single
|
||||
client IP address is allowed to open to the agent's HTTP(S) server. This
|
||||
affects the HTTP(S) servers in both client and server agents. Default
|
||||
value is `100`.
|
||||
* <a name="https_handshake_timeout"></a><a
|
||||
href="#https_handshake_timeout">`https_handshake_timeout`</a> -
|
||||
Configures the limit for how long the HTTPS server in both client and
|
||||
server agents will wait for a client to complete a TLS handshake. This
|
||||
should be kept conservative as it limits how many connections an
|
||||
unauthenticated attacker can open if `verify_incoming` is being using to
|
||||
authenticate clients (strongly recommended in production). Default value
|
||||
is `5s`.
|
||||
* <a name="rpc_handshake_timeout"></a><a
|
||||
href="#rpc_handshake_timeout">`rpc_handshake_timeout`</a> - Configures
|
||||
the limit for how long servers will wait after a client TCP connection
|
||||
is established before they complete the connection handshake. When TLS
|
||||
is used, the same timeout applies to the TLS handshake separately from
|
||||
the initial protocol negotiation. All Consul clients should perform this
|
||||
immediately on establishing a new connection. This should be kept
|
||||
conservative as it limits how many connections an unauthenticated
|
||||
attacker can open if `verify_incoming` is being using to authenticate
|
||||
clients (strongly recommended in production). When `verify_incoming` is
|
||||
true on servers, this limits how long the connection socket and
|
||||
associated goroutines will be held open before the client successfully
|
||||
authenticates. Default value is `5s`.
|
||||
* <a name="rpc_max_conns_per_client"></a><a
|
||||
href="#rpc_max_conns_per_client">`rpc_max_conns_per_client`</a> -
|
||||
Configures a limit of how many concurrent TCP connections a single
|
||||
source IP address is allowed to open to a single server. It affects both
|
||||
clients connections and other server connections. In general Consul
|
||||
clients multiplex many RPC calls over a single TCP connection so this
|
||||
can typically be kept low. It needs to be more than one though since
|
||||
servers open at least one additional connection for raft RPC, possibly
|
||||
more for WAN federation when using network areas, and snapshot requests
|
||||
from clients run over a separate TCP conn. A reasonably low limit
|
||||
significantly reduces the ability of an unauthenticated attacker to
|
||||
consume unbounded resources by holding open many connections. You may
|
||||
need to increase this if WAN federated servers connect via proxies or
|
||||
NAT gateways or similar causing many legitimate connections from a
|
||||
single source IP. Default value is `100` which is designed to be
|
||||
extremely conservative to limit issues with certain deployment patterns.
|
||||
Most deployments can probably reduce this safely. 100 connections on
|
||||
modern server hardware should not cause a significant impact on resource
|
||||
usage from an unauthenticated attacker though.
|
||||
* <a name="rpc_rate"></a><a href="#rpc_rate">`rpc_rate`</a> - Configures
|
||||
the RPC rate limiter on Consul _clients_ by setting the maximum request
|
||||
rate that this agent is allowed to make for RPC requests to Consul
|
||||
servers, in requests per second. Defaults to infinite, which disables
|
||||
rate limiting.
|
||||
* <a name="rpc_rate"></a><a href="#rpc_max_burst">`rpc_max_burst`</a> - The size of the token
|
||||
bucket used to recharge the RPC rate limiter. Defaults to 1000 tokens, and each token is
|
||||
good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket
|
||||
for more details about how token bucket rate limiters operate.
|
||||
* <a name="rpc_max_burst"></a><a href="#rpc_max_burst">`rpc_max_burst`</a> -
|
||||
The size of the token bucket used to recharge the RPC rate limiter on
|
||||
Consul _clients_ . Defaults to 1000 tokens, and each token is good for a
|
||||
single RPC call to a Consul server. See
|
||||
https://en.wikipedia.org/wiki/Token_bucket for more details about how
|
||||
token bucket rate limiters operate.
|
||||
|
||||
* <a name="log_file"></a><a href="#log_file">`log_file`</a> Equivalent to the
|
||||
[`-log-file` command-line flag](#_log_file).
|
||||
|
|
Loading…
Reference in New Issue