block PeerName register requests (#13887)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
alex 2022-07-29 14:36:22 -07:00 committed by GitHub
parent 95096e2c03
commit a45bb1f06b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 290 additions and 72 deletions

View File

@ -1342,6 +1342,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
cfg.SerfLANConfig = consul.CloneSerfLANConfig(cfg.SerfLANConfig)
cfg.PeeringEnabled = runtimeCfg.PeeringEnabled
cfg.PeeringTestAllowPeerRegistrations = runtimeCfg.PeeringTestAllowPeerRegistrations
enterpriseConsulConfig(cfg, runtimeCfg)
return cfg, nil

View File

@ -20,6 +20,60 @@ import (
"github.com/hashicorp/consul/testrpc"
)
func TestCatalogRegister_PeeringRegistration(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
t.Run("deny peer registrations by default", func(t *testing.T) {
a := NewTestAgent(t, "")
defer a.Shutdown()
// Register request with peer
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
obj, err := a.srv.CatalogRegister(nil, req)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
require.Nil(t, obj)
})
t.Run("cannot hcl set the peer registrations config", func(t *testing.T) {
// this will have no effect, as the value is overriden in non user source
a := NewTestAgent(t, "peering = { test_allow_peer_registrations = true }")
defer a.Shutdown()
// Register request with peer
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
obj, err := a.srv.CatalogRegister(nil, req)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
require.Nil(t, obj)
})
t.Run("allow peer registrations with test overrides", func(t *testing.T) {
// the only way to set the config in the agent is via the overrides
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
// Register request with peer
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
obj, err := a.srv.CatalogRegister(nil, req)
require.NoError(t, err)
applied, ok := obj.(bool)
require.True(t, ok)
require.True(t, applied)
})
}
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -1009,66 +1009,67 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
LogRotateBytes: intVal(c.LogRotateBytes),
LogRotateMaxFiles: intVal(c.LogRotateMaxFiles),
},
MaxQueryTime: b.durationVal("max_query_time", c.MaxQueryTime),
NodeID: types.NodeID(stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
ReadReplica: boolVal(c.ReadReplica),
PeeringEnabled: boolVal(c.Peering.Enabled),
PidFile: stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),
PrimaryGatewaysInterval: b.durationVal("primary_gateways_interval", c.PrimaryGatewaysInterval),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(float64Val(c.Limits.RPCRate)),
RPCConfig: consul.RPCConfig{EnableStreaming: boolValWithDefault(c.RPC.EnableStreaming, serverMode)},
RaftProtocol: intVal(c.RaftProtocol),
RaftSnapshotThreshold: intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
RaftTrailingLogs: intVal(c.RaftTrailingLogs),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: stringVal(c.SegmentName),
Segments: segments,
SegmentLimit: intVal(c.SegmentLimit),
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfAllowedCIDRsLAN: serfAllowedCIDRSLAN,
SerfAllowedCIDRsWAN: serfAllowedCIDRSWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: uint64Val(c.Limits.TxnMaxReqLen),
UIConfig: b.uiConfigVal(c.UIConfig),
UnixSocketGroup: stringVal(c.UnixSocket.Group),
UnixSocketMode: stringVal(c.UnixSocket.Mode),
UnixSocketUser: stringVal(c.UnixSocket.User),
Watches: c.Watches,
AutoReloadConfigCoalesceInterval: 1 * time.Second,
MaxQueryTime: b.durationVal("max_query_time", c.MaxQueryTime),
NodeID: types.NodeID(stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
ReadReplica: boolVal(c.ReadReplica),
PeeringEnabled: boolVal(c.Peering.Enabled),
PeeringTestAllowPeerRegistrations: boolValWithDefault(c.Peering.TestAllowPeerRegistrations, false),
PidFile: stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),
PrimaryGatewaysInterval: b.durationVal("primary_gateways_interval", c.PrimaryGatewaysInterval),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(float64Val(c.Limits.RPCRate)),
RPCConfig: consul.RPCConfig{EnableStreaming: boolValWithDefault(c.RPC.EnableStreaming, serverMode)},
RaftProtocol: intVal(c.RaftProtocol),
RaftSnapshotThreshold: intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
RaftTrailingLogs: intVal(c.RaftTrailingLogs),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: stringVal(c.SegmentName),
Segments: segments,
SegmentLimit: intVal(c.SegmentLimit),
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfAllowedCIDRsLAN: serfAllowedCIDRSLAN,
SerfAllowedCIDRsWAN: serfAllowedCIDRSWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: uint64Val(c.Limits.TxnMaxReqLen),
UIConfig: b.uiConfigVal(c.UIConfig),
UnixSocketGroup: stringVal(c.UnixSocket.Group),
UnixSocketMode: stringVal(c.UnixSocket.Mode),
UnixSocketUser: stringVal(c.UnixSocket.User),
Watches: c.Watches,
AutoReloadConfigCoalesceInterval: 1 * time.Second,
}
rt.TLS, err = b.buildTLSConfig(rt, c.TLS)

View File

@ -891,4 +891,8 @@ type TLS struct {
type Peering struct {
Enabled *bool `mapstructure:"enabled"`
// TestAllowPeerRegistrations controls whether CatalogRegister endpoints allow registrations for objects with `PeerName`
// This always gets overridden in NonUserSource()
TestAllowPeerRegistrations *bool `mapstructure:"test_allow_peer_registrations"`
}

View File

@ -207,6 +207,11 @@ func NonUserSource() Source {
# the max time before leaf certs can be generated after a roots change.
test_ca_leaf_root_change_spread = "0s"
}
peering = {
# We use peer registration for various testing
test_allow_peer_registrations = false
}
`,
}
}

View File

@ -818,6 +818,10 @@ type RuntimeConfig struct {
// hcl: peering { enabled = (true|false) }
PeeringEnabled bool
// TestAllowPeerRegistrations controls whether CatalogRegister endpoints allow
// registrations for objects with `PeerName`
PeeringTestAllowPeerRegistrations bool
// PidFile is the file to store our PID in.
//
// hcl: pid_file = string

View File

@ -236,6 +236,7 @@
"NodeMeta": {},
"NodeName": "",
"PeeringEnabled": false,
"PeeringTestAllowPeerRegistrations": false,
"PidFile": "",
"PrimaryDatacenter": "",
"PrimaryGateways": [

View File

@ -74,9 +74,36 @@ type Catalog struct {
logger hclog.Logger
}
func hasPeerNameInRequest(req *structs.RegisterRequest) bool {
if req == nil {
return false
}
// nodes, services, checks
if req.PeerName != structs.DefaultPeerKeyword {
return true
}
if req.Service != nil && req.Service.PeerName != structs.DefaultPeerKeyword {
return true
}
if req.Check != nil && req.Check.PeerName != structs.DefaultPeerKeyword {
return true
}
for _, check := range req.Checks {
if check.PeerName != structs.DefaultPeerKeyword {
return true
}
}
return false
}
// Register a service and/or check(s) in a node, creating the node if it doesn't exist.
// It is valid to pass no service or checks to simply create the node itself.
func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error {
if !c.srv.config.PeeringTestAllowPeerRegistrations && hasPeerNameInRequest(args) {
return fmt.Errorf("cannot register requests with PeerName in them")
}
if done, err := c.srv.ForwardRPC("Catalog.Register", args, reply); done {
return err
}

View File

@ -2765,6 +2765,104 @@ node_prefix "" {
return
}
// TestCatalog_Register_DenyPeeringRegistration makes sure that users cannot send structs.RegisterRequest
// with a PeerName in any part of the request.
func TestCatalog_Register_DenyPeeringRegistration(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s := testServerWithConfig(t)
codec := rpcClient(t, s)
// we will add PeerName to copies of arg
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db",
},
},
}
type testcase struct {
name string
reqCopyFn func(arg *structs.RegisterRequest) structs.RegisterRequest
}
testCases := []testcase{
{
name: "peer name on top level",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.PeerName = "foo"
return copyR
},
},
{
name: "peer name in service",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.Service.PeerName = "foo"
return copyR
},
},
{
name: "peer name in check",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.Check.PeerName = "foo"
return copyR
},
},
{
name: "peer name in checks",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.Checks[0].PeerName = "foo"
return copyR
},
},
{
name: "peer name everywhere",
reqCopyFn: func(arg *structs.RegisterRequest) structs.RegisterRequest {
copyR := *arg
copyR.PeerName = "foo1"
copyR.Service.PeerName = "foo2"
copyR.Check.PeerName = "foo3"
copyR.Checks[0].PeerName = "foo4"
return copyR
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
req := tc.reqCopyFn(&arg)
var out struct{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
})
}
}
func TestCatalog_ListServices_FilterACL(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -399,6 +399,8 @@ type Config struct {
// PeeringEnabled enables cluster peering.
PeeringEnabled bool
PeeringTestAllowPeerRegistrations bool
// Embedded Consul Enterprise specific configuration
*EnterpriseConfig
}
@ -515,7 +517,8 @@ func DefaultConfig() *Config {
DefaultQueryTime: 300 * time.Second,
MaxQueryTime: 600 * time.Second,
PeeringEnabled: true,
PeeringEnabled: true,
PeeringTestAllowPeerRegistrations: false,
EnterpriseConfig: DefaultEnterpriseConfig(),
}

View File

@ -558,7 +558,9 @@ func TestHealth_ServiceNodes(t *testing.T) {
}
t.Parallel()
_, s1 := testServer(t)
_, s1 := testServerWithConfig(t, func(config *Config) {
config.PeeringTestAllowPeerRegistrations = true
})
codec := rpcClient(t, s1)
waitForLeaderEstablishment(t, s1)

View File

@ -32,7 +32,9 @@ func TestInternal_NodeInfo(t *testing.T) {
}
t.Parallel()
_, s1 := testServer(t)
_, s1 := testServerWithConfig(t, func(config *Config) {
config.PeeringTestAllowPeerRegistrations = true
})
codec := rpcClient(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -113,7 +115,9 @@ func TestInternal_NodeDump(t *testing.T) {
}
t.Parallel()
_, s1 := testServer(t)
_, s1 := testServerWithConfig(t, func(config *Config) {
config.PeeringTestAllowPeerRegistrations = true
})
codec := rpcClient(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -221,7 +225,9 @@ func TestInternal_NodeDump_Filter(t *testing.T) {
}
t.Parallel()
_, s1 := testServer(t)
_, s1 := testServerWithConfig(t, func(config *Config) {
config.PeeringTestAllowPeerRegistrations = true
})
codec := rpcClient(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
@ -1756,7 +1762,9 @@ func TestInternal_ServiceDump_Peering(t *testing.T) {
}
t.Parallel()
_, s1 := testServer(t)
_, s1 := testServerWithConfig(t, func(config *Config) {
config.PeeringTestAllowPeerRegistrations = true
})
codec := rpcClient(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -1766,8 +1766,9 @@ func TestDNS_VirtualIPLookup(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
server, ok := a.delegate.(*consul.Server)

View File

@ -608,7 +608,9 @@ func TestHealthServiceNodes(t *testing.T) {
}
t.Parallel()
a := NewTestAgent(t, "")
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
testingPeerNames := []string{"", "my-peer"}
@ -817,8 +819,9 @@ use_streaming_backend = true
AllowedPrefixes: []string{"testing.grpc."},
}, sink)
a := NewTestAgent(t, tc.hcl)
a := StartTestAgent(t, TestAgent{HCL: tc.hcl, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register some initial service instances
@ -1013,8 +1016,9 @@ use_streaming_backend = true
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
a := NewTestAgent(t, tc.hcl)
a := StartTestAgent(t, TestAgent{HCL: tc.hcl, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register one with a tag.

View File

@ -475,6 +475,9 @@ func TestConfig(logger hclog.Logger, sources ...config.Source) *config.RuntimeCo
// tiny delay is effectively thre same.
cfg.ConnectTestCALeafRootChangeSpread = 1 * time.Nanosecond
// allows registering objects with the PeerName
cfg.PeeringTestAllowPeerRegistrations = true
return cfg
}

View File

@ -84,8 +84,9 @@ func TestUINodes(t *testing.T) {
}
t.Parallel()
a := NewTestAgent(t, "")
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
args := []*structs.RegisterRequest{
@ -263,8 +264,9 @@ func TestUIServices(t *testing.T) {
}
t.Parallel()
a := NewTestAgent(t, "")
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
requests := []*structs.RegisterRequest{