Add Proxy Upstreams to Service Definition (#4639)

* Refactor Service Definition ProxyDestination.

This includes:
 - Refactoring all internal structs used
 - Updated tests for both deprecated and new input for:
   - Agent Services endpoint response
   - Agent Service endpoint response
   - Agent Register endpoint
     - Unmanaged deprecated field
     - Unmanaged new fields
     - Managed deprecated upstreams
     - Managed new
   - Catalog Register
     - Unmanaged deprecated field
     - Unmanaged new fields
     - Managed deprecated upstreams
     - Managed new
   - Catalog Services endpoint response
   - Catalog Node endpoint response
   - Catalog Service endpoint response
 - Updated API tests for all of the above too (both deprecated and new forms of register)

TODO:
 - config package changes for on-disk service definitions
 - proxy config endpoint
 - built-in proxy support for new fields

* Agent proxy config endpoint updated with upstreams

* Config file changes for upstreams.

* Add upstream opaque config and update all tests to ensure it works everywhere.

* Built in proxy working with new Upstreams config

* Command fixes and deprecations

* Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts...

TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct.

* Fix translated keys in API registration.
≈

* Fixes from docs
 - omit some empty undocumented fields in API
 - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally.

* Documentation updates for Upstreams in service definition

* Fixes for tests broken by many refactors.

* Enable travis on f-connect branch in this branch too.

* Add consistent Deprecation comments to ProxyDestination uses

* Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
This commit is contained in:
Paul Banks 2018-09-12 17:07:47 +01:00
parent b06ddc9187
commit b83bbf248c
51 changed files with 2150 additions and 776 deletions

View File

@ -190,24 +190,39 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
EnableTagOverride: s.EnableTagOverride,
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
ProxyDestination: s.ProxyDestination,
Weights: weights,
}
if as.Tags == nil {
as.Tags = []string{}
}
if as.Meta == nil {
as.Meta = map[string]string{}
}
// Attach Connect configs if the exist
// Attach Unmanaged Proxy config if exists
if s.Kind == structs.ServiceKindConnectProxy {
as.Proxy = s.Proxy.ToAPI()
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
// Also set the deprecated ProxyDestination
as.ProxyDestination = as.Proxy.DestinationServiceName
}
// Attach Connect configs if the exist. We use the actual proxy state since
// that may have had defaults filled in compared to the config that was
// provided with the service as stored in the NodeService here.
if proxy, ok := proxies[id+"-proxy"]; ok {
as.Connect = &api.AgentServiceConnect{
Proxy: &api.AgentServiceConnectProxy{
ExecMode: api.ProxyExecMode(proxy.Proxy.ExecMode.String()),
Command: proxy.Proxy.Command,
Config: proxy.Proxy.Config,
Upstreams: proxy.Proxy.Upstreams.ToAPI(),
},
}
} else if s.Connect.Native {
as.Connect = &api.AgentServiceConnect{
Native: true,
}
}
agentSvcs[id] = as
}
@ -546,6 +561,29 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
"enable_tag_override": "EnableTagOverride",
})
// Translate upstream keys - we have the same upstream format in two
// possible places.
translateUpstreams := func(rawMap map[string]interface{}) {
var upstreams []interface{}
if us, ok := rawMap["upstreams"].([]interface{}); ok {
upstreams = us
}
if us, ok := rawMap["Upstreams"].([]interface{}); ok {
upstreams = us
}
for _, u := range upstreams {
if uMap, ok := u.(map[string]interface{}); ok {
config.TranslateKeys(uMap, map[string]string{
"destination_name": "DestinationName",
"destination_type": "DestinationType",
"destination_namespace": "DestinationNamespace",
"local_bind_port": "LocalBindPort",
"local_bind_address": "LocalBindAddress",
})
}
}
}
for k, v := range rawMap {
switch strings.ToLower(k) {
case "check":
@ -562,6 +600,32 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
return err
}
}
case "proxy":
if valMap, ok := v.(map[string]interface{}); ok {
config.TranslateKeys(valMap, map[string]string{
"destination_service_name": "DestinationServiceName",
"destination_service_id": "DestinationServiceID",
"local_service_port": "LocalServicePort",
"local_service_address": "LocalServiceAddress",
})
translateUpstreams(valMap)
}
case "connect":
if connectMap, ok := v.(map[string]interface{}); ok {
var proxyMap map[string]interface{}
if pMap, ok := connectMap["Proxy"].(map[string]interface{}); ok {
proxyMap = pMap
}
if pMap, ok := connectMap["proxy"].(map[string]interface{}); ok {
proxyMap = pMap
}
if proxyMap != nil {
config.TranslateKeys(proxyMap, map[string]string{
"exec_mode": "ExecMode",
})
translateUpstreams(proxyMap)
}
}
}
}
return nil
@ -1109,6 +1173,7 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
ExecMode: api.ProxyExecMode(proxy.Proxy.ExecMode.String()),
Command: proxy.Proxy.Command,
Config: config,
Upstreams: proxy.Proxy.Upstreams.ToAPI(),
}
return contentHash, reply, nil
})

View File

@ -28,7 +28,6 @@ import (
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -75,6 +74,7 @@ func TestAgent_Services(t *testing.T) {
"foo": "bar",
},
TargetServiceID: "mysql",
Upstreams: structs.TestUpstreams(t),
}
_, err := a.State.AddProxy(prxy1, "", "")
require.NoError(t, err)
@ -88,11 +88,12 @@ func TestAgent_Services(t *testing.T) {
assert.Lenf(t, val, 1, "bad services: %v", obj)
assert.Equal(t, 5000, val["mysql"].Port)
assert.Equal(t, srv1.Meta, val["mysql"].Meta)
assert.NotNil(t, val["mysql"].Connect)
assert.NotNil(t, val["mysql"].Connect.Proxy)
require.NotNil(t, val["mysql"].Connect)
require.NotNil(t, val["mysql"].Connect.Proxy)
assert.Equal(t, prxy1.ExecMode.String(), string(val["mysql"].Connect.Proxy.ExecMode))
assert.Equal(t, prxy1.Command, val["mysql"].Connect.Proxy.Command)
assert.Equal(t, prxy1.Config, val["mysql"].Connect.Proxy.Config)
assert.Equal(t, prxy1.Upstreams.ToAPI(), val["mysql"].Connect.Proxy.Upstreams)
}
// This tests that the agent services endpoint (/v1/agent/services) returns
@ -110,7 +111,10 @@ func TestAgent_Services_ExternalConnectProxy(t *testing.T) {
ID: "db-proxy",
Service: "db-proxy",
Port: 5000,
ProxyDestination: "db",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "db",
Upstreams: structs.TestUpstreams(t),
},
}
a.State.AddService(srv1, "")
@ -121,7 +125,12 @@ func TestAgent_Services_ExternalConnectProxy(t *testing.T) {
assert.Len(val, 1)
actual := val["db-proxy"]
assert.Equal(api.ServiceKindConnectProxy, actual.Kind)
assert.Equal("db", actual.ProxyDestination)
assert.Equal(srv1.Proxy.ToAPI(), actual.Proxy)
// DEPRECATED (ProxyDestination) - remove the next comment and assertion
// Should still have deprecated ProxyDestination filled in until we remove it
// completely at a major version bump.
assert.Equal(srv1.Proxy.DestinationServiceName, actual.ProxyDestination)
}
func TestAgent_Services_ACLFilter(t *testing.T) {
@ -1376,20 +1385,66 @@ func TestAgent_RegisterService(t *testing.T) {
func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
a := NewTestAgent(t.Name(), `
connect {
proxy {
allow_managed_api_registration = true
}
}
`)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
json := `{"name":"test", "port":8000, "enable_tag_override": true, "meta": {"some": "meta"}, "weights":{"passing": 16}}`
json := `
{
"name":"test",
"port":8000,
"enable_tag_override": true,
"meta": {
"some": "meta"
},
"kind": "connect-proxy",
"proxy": {
"destination_service_name": "web",
"destination_service_id": "web",
"local_service_port": 1234,
"local_service_address": "127.0.0.1",
"upstreams": [
{
"destination_type": "service",
"destination_namespace": "default",
"destination_name": "db",
"local_bind_address": "127.0.0.1",
"local_bind_port": 1234
}
]
},
"connect": {
"proxy": {
"exec_mode": "script",
"upstreams": [
{
"destination_type": "service",
"destination_namespace": "default",
"destination_name": "db",
"local_bind_address": "127.0.0.1",
"local_bind_port": 1234
}
]
}
},
"weights":{
"passing": 16
}
}`
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", strings.NewReader(json))
obj, err := a.srv.AgentRegisterService(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if obj != nil {
t.Fatalf("bad: %v", obj)
}
rr := httptest.NewRecorder()
obj, err := a.srv.AgentRegisterService(rr, req)
require.NoError(t, err)
require.Nil(t, obj)
require.Equal(t, 200, rr.Code, "body: %s", rr.Body)
svc := &structs.NodeService{
ID: "test",
Service: "test",
@ -1397,11 +1452,40 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
Port: 8000,
EnableTagOverride: true,
Weights: &structs.Weights{Passing: 16, Warning: 0},
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 1234,
Upstreams: structs.Upstreams{
{
DestinationType: structs.UpstreamDestTypeService,
DestinationName: "db",
DestinationNamespace: "default",
LocalBindAddress: "127.0.0.1",
LocalBindPort: 1234,
},
},
},
Connect: structs.ServiceConnect{
Proxy: &structs.ServiceDefinitionConnectProxy{
ExecMode: "script",
Upstreams: structs.Upstreams{
{
DestinationType: structs.UpstreamDestTypeService,
DestinationName: "db",
DestinationNamespace: "default",
LocalBindAddress: "127.0.0.1",
LocalBindPort: 1234,
},
},
},
},
}
if got, want := a.State.Service("test"), svc; !verify.Values(t, "", got, want) {
t.Fail()
}
got := a.State.Service("test")
require.Equal(t, svc, got)
}
func TestAgent_RegisterService_ACLDeny(t *testing.T) {
@ -1500,6 +1584,8 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) {
Config: map[string]interface{}{
"foo": "bar",
},
// Includes an upstream with missing defaulted type
Upstreams: structs.TestUpstreams(t).ToAPI(),
},
},
}
@ -1520,7 +1606,7 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) {
proxySvc, ok := a.State.Services()["web-proxy"]
require.True(ok, "has proxy service")
assert.Equal(structs.ServiceKindConnectProxy, proxySvc.Kind)
assert.Equal("web", proxySvc.ProxyDestination)
assert.Equal("web", proxySvc.Proxy.DestinationServiceName)
assert.NotEmpty(proxySvc.Port, "a port should have been assigned")
// Ensure proxy itself was registered
@ -1529,6 +1615,108 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) {
assert.Equal(structs.ProxyExecModeScript, proxy.Proxy.ExecMode)
assert.Equal([]string{"proxy.sh"}, proxy.Proxy.Command)
assert.Equal(args.Connect.Proxy.Config, proxy.Proxy.Config)
// Unsure the defaulted type is explicitly filled
args.Connect.Proxy.Upstreams[0].DestinationType = api.UpstreamDestTypeService
assert.Equal(args.Connect.Proxy.Upstreams,
proxy.Proxy.Upstreams.ToAPI())
// Ensure the token was configured
assert.Equal("abc123", a.State.ServiceToken("web"))
assert.Equal("abc123", a.State.ServiceToken("web-proxy"))
}
// This tests local agent service registration with a managed proxy using
// original deprecated upstreams syntax.
func TestAgent_RegisterService_ManagedConnectProxyDeprecated(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), `
connect {
proxy {
allow_managed_api_registration = true
}
}
`)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register a proxy. Note that the destination doesn't exist here on
// this agent or in the catalog at all. This is intended and part
// of the design.
args := &api.AgentServiceRegistration{
Name: "web",
Port: 8000,
Connect: &api.AgentServiceConnect{
Proxy: &api.AgentServiceConnectProxy{
ExecMode: "script",
Command: []string{"proxy.sh"},
Config: map[string]interface{}{
"foo": "bar",
"upstreams": []interface{}{
map[string]interface{}{
"destination_name": "db",
"local_bind_port": 1234,
// this was a field for old upstreams we don't support any more.
// It should be copied into Upstreams' Config.
"connect_timeout_ms": 1000,
},
map[string]interface{}{
"destination_name": "geo-cache",
"destination_type": "prepared_query",
"local_bind_port": 1235,
},
},
},
},
},
}
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=abc123", jsonReader(args))
resp := httptest.NewRecorder()
obj, err := a.srv.AgentRegisterService(resp, req)
assert.NoError(err)
assert.Nil(obj)
require.Equal(200, resp.Code, "request failed with body: %s",
resp.Body.String())
// Ensure the target service
_, ok := a.State.Services()["web"]
assert.True(ok, "has service")
// Ensure the proxy service was registered
proxySvc, ok := a.State.Services()["web-proxy"]
require.True(ok, "has proxy service")
assert.Equal(structs.ServiceKindConnectProxy, proxySvc.Kind)
assert.Equal("web", proxySvc.Proxy.DestinationServiceName)
assert.NotEmpty(proxySvc.Port, "a port should have been assigned")
// Ensure proxy itself was registered
proxy := a.State.Proxy("web-proxy")
require.NotNil(proxy)
assert.Equal(structs.ProxyExecModeScript, proxy.Proxy.ExecMode)
assert.Equal([]string{"proxy.sh"}, proxy.Proxy.Command)
// Remove the upstreams from the args - we expect them not to show up in
// response now since that moved.
delete(args.Connect.Proxy.Config, "upstreams")
assert.Equal(args.Connect.Proxy.Config, proxy.Proxy.Config)
expectUpstreams := structs.Upstreams{
{
DestinationType: structs.UpstreamDestTypeService,
DestinationName: "db",
LocalBindPort: 1234,
Config: map[string]interface{}{
"connect_timeout_ms": float64(1000),
},
},
{
DestinationType: structs.UpstreamDestTypePreparedQuery,
DestinationName: "geo-cache",
LocalBindPort: 1235,
},
}
assert.Equal(expectUpstreams, proxy.Proxy.Upstreams)
// Ensure the token was configured
assert.Equal("abc123", a.State.ServiceToken("web"))
@ -1584,30 +1772,44 @@ func TestAgent_RegisterService_UnmanagedConnectProxy(t *testing.T) {
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a proxy. Note that the destination doesn't exist here on
// this agent or in the catalog at all. This is intended and part
// of the design.
args := &structs.ServiceDefinition{
Kind: structs.ServiceKindConnectProxy,
// Register a proxy. Note that the destination doesn't exist here on this
// agent or in the catalog at all. This is intended and part of the design.
args := &api.AgentServiceRegistration{
Kind: api.ServiceKindConnectProxy,
Name: "connect-proxy",
Port: 8000,
ProxyDestination: "db",
Check: structs.CheckType{
TTL: 15 * time.Second,
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
ProxyDestination: "bad_destination", // Deprecated, check it's overridden
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: []api.Upstream{
{
// No type to force default
DestinationName: "db",
LocalBindPort: 1234,
},
{
DestinationType: "prepared_query",
DestinationName: "geo-cache",
LocalBindPort: 1235,
},
},
},
}
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=abc123", jsonReader(args))
resp := httptest.NewRecorder()
obj, err := a.srv.AgentRegisterService(resp, req)
assert.Nil(err)
require.NoError(t, err)
assert.Nil(obj)
// Ensure the service
svc, ok := a.State.Services()["connect-proxy"]
assert.True(ok, "has service")
assert.Equal(structs.ServiceKindConnectProxy, svc.Kind)
assert.Equal("db", svc.ProxyDestination)
// Registration must set that default type
args.Proxy.Upstreams[0].DestinationType = api.UpstreamDestTypeService
assert.Equal(args.Proxy, svc.Proxy.ToAPI())
// Ensure the token was configured
assert.Equal("abc123", a.State.ServiceToken("connect-proxy"))
@ -1627,7 +1829,9 @@ func TestAgent_RegisterService_UnmanagedConnectProxyInvalid(t *testing.T) {
args := &structs.ServiceDefinition{
Kind: structs.ServiceKindConnectProxy,
Name: "connect-proxy",
ProxyDestination: "db",
Proxy: &structs.ConnectProxyConfig{
DestinationServiceName: "db",
},
Check: structs.CheckType{
TTL: 15 * time.Second,
},
@ -2982,10 +3186,12 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) {
Config: map[string]interface{}{
"bind_port": 1234,
"connect_timeout_ms": 500,
// Specify upstreams in deprecated nested config way here. We test the
// new way in the update case below.
"upstreams": []map[string]interface{}{
{
"destination_name": "db",
"local_port": 3131,
"local_bind_port": 3131,
},
},
},
@ -2997,34 +3203,36 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) {
ProxyServiceID: "test-proxy",
TargetServiceID: "test",
TargetServiceName: "test",
ContentHash: "4662e51e78609569",
ContentHash: "a7c93585b6d70445",
ExecMode: "daemon",
Command: []string{"tubes.sh"},
Config: map[string]interface{}{
"upstreams": []interface{}{
map[string]interface{}{
"destination_name": "db",
"local_port": float64(3131),
},
},
"bind_address": "127.0.0.1",
"local_service_address": "127.0.0.1:8000",
"bind_port": int(1234),
"connect_timeout_ms": float64(500),
},
Upstreams: []api.Upstream{
{
DestinationType: "service",
DestinationName: "db",
LocalBindPort: 3131,
},
},
}
ur, err := copystructure.Copy(expectedResponse)
require.NoError(t, err)
updatedResponse := ur.(*api.ConnectProxyConfig)
updatedResponse.ContentHash = "23b5b6b3767601e1"
upstreams := updatedResponse.Config["upstreams"].([]interface{})
upstreams = append(upstreams,
map[string]interface{}{
"destination_name": "cache",
"local_port": float64(4242),
updatedResponse.ContentHash = "aedc0ca0f3f7794e"
updatedResponse.Upstreams = append(updatedResponse.Upstreams, api.Upstream{
DestinationType: "service",
DestinationName: "cache",
LocalBindPort: 4242,
Config: map[string]interface{}{
"connect_timeout_ms": float64(1000),
},
})
updatedResponse.Config["upstreams"] = upstreams
tests := []struct {
name string
@ -3066,7 +3274,7 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) {
r2, err := copystructure.Copy(reg)
require.NoError(t, err)
reg2 := r2.(*structs.ServiceDefinition)
reg2.Connect.Proxy.Config = updatedResponse.Config
reg2.Connect.Proxy.Upstreams = structs.UpstreamsFromAPI(updatedResponse.Upstreams)
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(r2))
resp := httptest.NewRecorder()
_, err = a.srv.AgentRegisterService(resp, req)

View File

@ -273,7 +273,7 @@ func TestCatalogNodes_Blocking(t *testing.T) {
// an error channel instead.
errch := make(chan error, 2)
go func() {
testrpc.WaitForLeader(t, a.RPC, "dc1")
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
start := time.Now()
// register a service after the blocking call
@ -846,6 +846,9 @@ func TestCatalogServiceNodes_ConnectProxy(t *testing.T) {
nodes := obj.(structs.ServiceNodes)
assert.Len(nodes, 1)
assert.Equal(structs.ServiceKindConnectProxy, nodes[0].ServiceKind)
assert.Equal(args.Service.Proxy, nodes[0].ServiceProxy)
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
assert.Equal(args.Service.Proxy.DestinationServiceName, nodes[0].ServiceProxyDestination)
}
// Test that the Connect-compatible endpoints can be queried for a
@ -864,7 +867,7 @@ func TestCatalogConnectServiceNodes_good(t *testing.T) {
assert.Nil(a.RPC("Catalog.Register", args, &out))
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/catalog/connect/%s", args.Service.ProxyDestination), nil)
"/v1/catalog/connect/%s", args.Service.Proxy.DestinationServiceName), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogConnectServiceNodes(resp, req)
assert.Nil(err)
@ -874,6 +877,7 @@ func TestCatalogConnectServiceNodes_good(t *testing.T) {
assert.Len(nodes, 1)
assert.Equal(structs.ServiceKindConnectProxy, nodes[0].ServiceKind)
assert.Equal(args.Service.Address, nodes[0].ServiceAddress)
assert.Equal(args.Service.Proxy, nodes[0].ServiceProxy)
}
func TestCatalogNodeServices(t *testing.T) {
@ -882,7 +886,7 @@ func TestCatalogNodeServices(t *testing.T) {
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register node
// Register node with a regular service and connect proxy
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
@ -898,6 +902,10 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Register a connect proxy
args.Service = structs.TestNodeServiceProxy(t)
require.NoError(t, a.RPC("Catalog.Register", args, &out))
req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc1", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogNodeServices(resp, req)
@ -907,9 +915,12 @@ func TestCatalogNodeServices(t *testing.T) {
assertIndex(t, resp)
services := obj.(*structs.NodeServices)
if len(services.Services) != 1 {
if len(services.Services) != 2 {
t.Fatalf("bad: %v", obj)
}
// Proxy service should have it's config intact
require.Equal(t, args.Service.Proxy, services.Services["web-proxy"].Proxy)
}
// Test that the services on a node contain all the Connect proxies on

View File

@ -1111,7 +1111,9 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition {
EnableTagOverride: b.boolVal(v.EnableTagOverride),
Weights: serviceWeights,
Checks: checks,
ProxyDestination: b.stringVal(v.ProxyDestination),
// DEPRECATED (ProxyDestination) - don't populate deprecated field, just use
// it as a default below on read. Remove that when remofing ProxyDestination
Proxy: b.serviceProxyVal(v.Proxy, v.ProxyDestination),
Connect: b.serviceConnectVal(v.Connect),
}
}
@ -1128,6 +1130,45 @@ func (b *Builder) serviceKindVal(v *string) structs.ServiceKind {
}
}
func (b *Builder) serviceProxyVal(v *ServiceProxy, deprecatedDest *string) *structs.ConnectProxyConfig {
if v == nil {
if deprecatedDest != nil {
return &structs.ConnectProxyConfig{
DestinationServiceName: b.stringVal(deprecatedDest),
}
}
return nil
}
return &structs.ConnectProxyConfig{
DestinationServiceName: b.stringVal(v.DestinationServiceName),
DestinationServiceID: b.stringVal(v.DestinationServiceID),
LocalServiceAddress: b.stringVal(v.LocalServiceAddress),
LocalServicePort: b.intVal(v.LocalServicePort),
Config: v.Config,
Upstreams: b.upstreamsVal(v.Upstreams),
}
}
func (b *Builder) upstreamsVal(v []Upstream) structs.Upstreams {
ups := make(structs.Upstreams, len(v))
for i, u := range v {
ups[i] = structs.Upstream{
DestinationType: b.stringVal(u.DestinationType),
DestinationNamespace: b.stringVal(u.DestinationNamespace),
DestinationName: b.stringVal(u.DestinationName),
Datacenter: b.stringVal(u.Datacenter),
LocalBindAddress: b.stringVal(u.LocalBindAddress),
LocalBindPort: b.intVal(u.LocalBindPort),
Config: u.Config,
}
if ups[i].DestinationType == "" {
ups[i].DestinationType = structs.UpstreamDestTypeService
}
}
return ups
}
func (b *Builder) serviceConnectVal(v *ServiceConnect) *structs.ServiceConnect {
if v == nil {
return nil
@ -1139,6 +1180,7 @@ func (b *Builder) serviceConnectVal(v *ServiceConnect) *structs.ServiceConnect {
ExecMode: b.stringVal(v.Proxy.ExecMode),
Command: v.Proxy.Command,
Config: v.Proxy.Config,
Upstreams: b.upstreamsVal(v.Proxy.Upstreams),
}
}

View File

@ -84,8 +84,12 @@ func Parse(data string, format string) (c Config, err error) {
"services",
"services.checks",
"watches",
"service.connect.proxy.config.upstreams",
"services.connect.proxy.config.upstreams",
"service.connect.proxy.config.upstreams", // Deprecated
"services.connect.proxy.config.upstreams", // Deprecated
"service.connect.proxy.upstreams",
"services.connect.proxy.upstreams",
"service.proxy.upstreams",
"services.proxy.upstreams",
})
// There is a difference of representation of some fields depending on
@ -341,7 +345,9 @@ type ServiceDefinition struct {
Token *string `json:"token,omitempty" hcl:"token" mapstructure:"token"`
Weights *ServiceWeights `json:"weights,omitempty" hcl:"weights" mapstructure:"weights"`
EnableTagOverride *bool `json:"enable_tag_override,omitempty" hcl:"enable_tag_override" mapstructure:"enable_tag_override"`
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
ProxyDestination *string `json:"proxy_destination,omitempty" hcl:"proxy_destination" mapstructure:"proxy_destination"`
Proxy *ServiceProxy `json:"proxy,omitempty" hcl:"proxy" mapstructure:"proxy"`
Connect *ServiceConnect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"`
}
@ -383,6 +389,78 @@ type ServiceConnectProxy struct {
Command []string `json:"command,omitempty" hcl:"command" mapstructure:"command"`
ExecMode *string `json:"exec_mode,omitempty" hcl:"exec_mode" mapstructure:"exec_mode"`
Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"`
Upstreams []Upstream `json:"upstreams,omitempty" hcl:"upstreams" mapstructure:"upstreams"`
}
// ServiceProxy is the additional config needed for a Kind = connect-proxy
// registration.
type ServiceProxy struct {
// DestinationServiceName is required and is the name of the service to accept
// traffic for.
DestinationServiceName *string `json:"destination_service_name,omitempty" hcl:"destination_service_name" mapstructure:"destination_service_name"`
// DestinationServiceID is optional and should only be specified for
// "side-car" style proxies where the proxy is in front of just a single
// instance of the service. It should be set to the service ID of the instance
// being represented which must be registered to the same agent. It's valid to
// provide a service ID that does not yet exist to avoid timing issues when
// bootstrapping a service with a proxy.
DestinationServiceID *string `json:"destination_service_id,omitempty" hcl:"destination_service_id" mapstructure:"destination_service_id"`
// LocalServiceAddress is the address of the local service instance. It is
// optional and should only be specified for "side-car" style proxies. It will
// default to 127.0.0.1 if the proxy is a "side-car" (DestinationServiceID is
// set) but otherwise will be ignored.
LocalServiceAddress *string `json:"local_service_address,omitempty" hcl:"local_service_address" mapstructure:"local_service_address"`
// LocalServicePort is the port of the local service instance. It is optional
// and should only be specified for "side-car" style proxies. It will default
// to the registered port for the instance if the proxy is a "side-car"
// (DestinationServiceID is set) but otherwise will be ignored.
LocalServicePort *int `json:"local_service_port,omitempty" hcl:"local_service_port" mapstructure:"local_service_port"`
// Config is the arbitrary configuration data provided with the proxy
// registration.
Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"`
// Upstreams describes any upstream dependencies the proxy instance should
// setup.
Upstreams []Upstream `json:"upstreams,omitempty" hcl:"upstreams" mapstructure:"upstreams"`
}
// Upstream represents a single upstream dependency for a service or proxy. It
// describes the mechanism used to discover instances to communicate with (the
// Target) as well as any potential client configuration that may be useful such
// as load balancer options, timeouts etc.
type Upstream struct {
// Destination fields are the required ones for determining what this upstream
// points to. Depending on DestinationType some other fields below might
// further restrict the set of instances allowable.
//
// DestinationType would be better as an int constant but even with custom
// JSON marshallers it causes havoc with all the mapstructure mangling we do
// on service definitions in various places.
DestinationType *string `json:"destination_type,omitempty" hcl:"destination_type" mapstructure:"destination_type"`
DestinationNamespace *string `json:"destination_namespace,omitempty" hcl:"destination_namespace" mapstructure:"destination_namespace"`
DestinationName *string `json:"destination_name,omitempty" hcl:"destination_name" mapstructure:"destination_name"`
// Datacenter that the service discovery request should be run against. Note
// for prepared queries, the actual results might be from a different
// datacenter.
Datacenter *string `json:"datacenter,omitempty" hcl:"datacenter" mapstructure:"datacenter"`
// LocalBindAddress is the ip address a side-car proxy should listen on for
// traffic destined for this upstream service. Default if empty is 127.0.0.1.
LocalBindAddress *string `json:"local_bind_address,omitempty" hcl:"local_bind_address" mapstructure:"local_bind_address"`
// LocalBindPort is the ip address a side-car proxy should listen on for traffic
// destined for this upstream service. Required.
LocalBindPort *int `json:"local_bind_port,omitempty" hcl:"local_bind_port" mapstructure:"local_bind_port"`
// Config is an opaque config that is specific to the proxy process being run.
// It can be used to pass abritrary configuration for this specific upstream
// to the proxy.
Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"`
}
// Connect is the agent-global connect configuration.

View File

@ -2140,53 +2140,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
{
desc: "HCL service managed proxy 'upstreams'",
args: []string{
`-data-dir=` + dataDir,
},
hcl: []string{
`service {
name = "web"
port = 8080
connect {
proxy {
config {
upstreams {
local_bind_port = 1234
}
}
}
}
}`,
},
skipformat: true, // skipping JSON cause we get slightly diff types (okay)
patch: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
rt.Services = []*structs.ServiceDefinition{
&structs.ServiceDefinition{
Name: "web",
Port: 8080,
Connect: &structs.ServiceConnect{
Proxy: &structs.ServiceDefinitionConnectProxy{
Config: map[string]interface{}{
"upstreams": []map[string]interface{}{
map[string]interface{}{
"local_bind_port": 1234,
},
},
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
}
},
},
{
desc: "JSON service managed proxy 'upstreams'",
desc: "Service managed proxy 'upstreams'",
args: []string{
`-data-dir=` + dataDir,
},
@ -2197,17 +2151,29 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
"port": 8080,
"connect": {
"proxy": {
"config": {
"upstreams": [{
"destination_name": "db",
"local_bind_port": 1234
}]
}
}
}
}`,
},
hcl: []string{
`service {
name = "web"
port = 8080
connect {
proxy {
upstreams {
destination_name = "db"
local_bind_port = 1234
}
}
}
}`,
},
skipformat: true, // skipping HCL cause we get slightly diff types (okay)
patch: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
rt.Services = []*structs.ServiceDefinition{
@ -2216,11 +2182,11 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
Port: 8080,
Connect: &structs.ServiceConnect{
Proxy: &structs.ServiceDefinitionConnectProxy{
Config: map[string]interface{}{
"upstreams": []interface{}{
map[string]interface{}{
"local_bind_port": float64(1234),
},
Upstreams: structs.Upstreams{
{
DestinationName: "db",
DestinationType: structs.UpstreamDestTypeService,
LocalBindPort: 1234,
},
},
},
@ -2235,35 +2201,49 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
{
desc: "JSON multiple services managed proxy 'upstreams'",
desc: "Multiple service managed proxy 'upstreams'",
args: []string{
`-data-dir=` + dataDir,
},
json: []string{
`{
"services": [{
"service": {
"name": "web",
"port": 8080,
"connect": {
"proxy": {
"config": {
"upstreams": [{
"destination_name": "db",
"local_bind_port": 1234
}, {
"destination_name": "cache",
"local_bind_port": 2345
}]
}
}
}
},{
"name": "service-A2",
"port": 81,
"tags": [],
"checks": []
}]
}`,
},
skipformat: true, // skipping HCL cause we get slightly diff types (okay)
hcl: []string{
`service {
name = "web"
port = 8080
connect {
proxy {
upstreams = [
{
destination_name = "db"
local_bind_port = 1234
},
{
destination_name = "cache"
local_bind_port = 2345
}
]
}
}
}`,
},
patch: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
rt.Services = []*structs.ServiceDefinition{
@ -2272,26 +2252,20 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
Port: 8080,
Connect: &structs.ServiceConnect{
Proxy: &structs.ServiceDefinitionConnectProxy{
Config: map[string]interface{}{
"upstreams": []interface{}{
map[string]interface{}{
"local_bind_port": float64(1234),
Upstreams: structs.Upstreams{
{
DestinationName: "db",
DestinationType: structs.UpstreamDestTypeService,
LocalBindPort: 1234,
},
map[string]interface{}{
"local_bind_port": float64(2345),
{
DestinationName: "cache",
DestinationType: structs.UpstreamDestTypeService,
LocalBindPort: 2345,
},
},
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
&structs.ServiceDefinition{
Name: "service-A2",
Port: 81,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
@ -2946,10 +2920,34 @@ func TestFullConfig(t *testing.T) {
},
{
"id": "Kh81CPF6",
"kind": "connect-proxy",
"name": "Kh81CPF6-proxy",
"port": 31471,
"kind": "connect-proxy",
"proxy_destination": "6L6BVfgH"
"proxy": {
"config": {
"cedGGtZf": "pWrUNiWw"
},
"destination_service_id": "6L6BVfgH-id",
"destination_service_name": "6L6BVfgH",
"local_service_address": "127.0.0.2",
"local_service_port": 23759,
"upstreams": [
{
"destination_name": "KPtAj2cb",
"local_bind_port": 4051,
"config": {
"kzRnZOyd": "nUNKoL8H"
}
},
{
"destination_name": "KSd8HsRl",
"destination_namespace": "9nakw0td",
"destination_type": "prepared_query",
"local_bind_address": "127.24.88.0",
"local_bind_port": 11884
}
]
}
}
],
"session_ttl_min": "26627s",
@ -3448,7 +3446,31 @@ func TestFullConfig(t *testing.T) {
name = "Kh81CPF6-proxy"
port = 31471
kind = "connect-proxy"
proxy_destination = "6L6BVfgH"
proxy {
destination_service_name = "6L6BVfgH"
destination_service_id = "6L6BVfgH-id"
local_service_address = "127.0.0.2"
local_service_port = 23759
config {
cedGGtZf = "pWrUNiWw"
}
upstreams = [
{
destination_name = "KPtAj2cb"
local_bind_port = 4051
config {
kzRnZOyd = "nUNKoL8H"
}
},
{
destination_type = "prepared_query"
destination_namespace = "9nakw0td"
destination_name = "KSd8HsRl"
local_bind_port = 11884
local_bind_address = "127.24.88.0"
},
]
}
}
]
session_ttl_min = "26627s"
@ -3946,7 +3968,32 @@ func TestFullConfig(t *testing.T) {
Name: "Kh81CPF6-proxy",
Port: 31471,
Kind: "connect-proxy",
ProxyDestination: "6L6BVfgH",
Proxy: &structs.ConnectProxyConfig{
DestinationServiceName: "6L6BVfgH",
DestinationServiceID: "6L6BVfgH-id",
LocalServiceAddress: "127.0.0.2",
LocalServicePort: 23759,
Config: map[string]interface{}{
"cedGGtZf": "pWrUNiWw",
},
Upstreams: structs.Upstreams{
{
DestinationType: "service", // Default should be explicitly filled
DestinationName: "KPtAj2cb",
LocalBindPort: 4051,
Config: map[string]interface{}{
"kzRnZOyd": "nUNKoL8H",
},
},
{
DestinationType: "prepared_query",
DestinationNamespace: "9nakw0td",
DestinationName: "KSd8HsRl",
LocalBindPort: 11884,
LocalBindAddress: "127.24.88.0",
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
@ -4612,6 +4659,7 @@ func TestSanitize(t *testing.T) {
"Meta": {},
"Name": "foo",
"Port": 0,
"Proxy": null,
"ProxyDestination": "",
"Tags": [],
"Token": "hidden",

View File

@ -83,7 +83,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
// Proxies must have write permission on their destination
if args.Service.Kind == structs.ServiceKindConnectProxy {
if rule != nil && !rule.ServiceWrite(args.Service.ProxyDestination, nil) {
if rule != nil && !rule.ServiceWrite(args.Service.Proxy.DestinationServiceName, nil) {
return acl.ErrPermissionDenied
}
}

View File

@ -359,7 +359,40 @@ func TestCatalog_Register_ConnectProxy(t *testing.T) {
assert.Len(resp.ServiceNodes, 1)
v := resp.ServiceNodes[0]
assert.Equal(structs.ServiceKindConnectProxy, v.ServiceKind)
assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination)
assert.Equal(args.Service.Proxy.DestinationServiceName, v.ServiceProxy.DestinationServiceName)
}
// DEPRECATED (ProxyDestination) - remove this whole test case when removing
// ProxyDestination
func TestCatalog_Register_DeprecatedConnectProxy(t *testing.T) {
t.Parallel()
assert := assert.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
args := structs.TestRegisterRequestProxy(t)
args.Service.ProxyDestination = "legacy"
args.Service.Proxy = structs.ConnectProxyConfig{}
// Register
var out struct{}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// List
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: args.Service.Service,
}
var resp structs.IndexedServiceNodes
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
assert.Len(resp.ServiceNodes, 1)
v := resp.ServiceNodes[0]
assert.Equal(structs.ServiceKindConnectProxy, v.ServiceKind)
assert.Equal(args.Service.ProxyDestination, v.ServiceProxy.DestinationServiceName)
}
// Test an invalid ConnectProxy. We don't need to exhaustively test because
@ -375,13 +408,13 @@ func TestCatalog_Register_ConnectProxy_invalid(t *testing.T) {
defer codec.Close()
args := structs.TestRegisterRequestProxy(t)
args.Service.ProxyDestination = ""
args.Service.Proxy.DestinationServiceName = ""
// Register
var out struct{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)
assert.NotNil(err)
assert.Contains(err.Error(), "ProxyDestination")
assert.Contains(err.Error(), "DestinationServiceName")
}
// Test that write is required for the proxy destination to register a proxy.
@ -422,7 +455,7 @@ service "foo" {
// Register should fail because we don't have permission on the destination
args := structs.TestRegisterRequestProxy(t)
args.Service.Service = "foo"
args.Service.ProxyDestination = "bar"
args.Service.Proxy.DestinationServiceName = "bar"
args.WriteRequest.Token = token
var out struct{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)
@ -431,7 +464,7 @@ service "foo" {
// Register should fail with the right destination but wrong name
args = structs.TestRegisterRequestProxy(t)
args.Service.Service = "bar"
args.Service.ProxyDestination = "foo"
args.Service.Proxy.DestinationServiceName = "foo"
args.WriteRequest.Token = token
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)
assert.True(acl.IsErrPermissionDenied(err))
@ -439,7 +472,7 @@ service "foo" {
// Register should work with the right destination
args = structs.TestRegisterRequestProxy(t)
args.Service.Service = "foo"
args.Service.ProxyDestination = "foo"
args.Service.Proxy.DestinationServiceName = "foo"
args.WriteRequest.Token = token
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
}
@ -1812,7 +1845,7 @@ func TestCatalog_ListServiceNodes_ConnectProxy(t *testing.T) {
assert.Len(resp.ServiceNodes, 1)
v := resp.ServiceNodes[0]
assert.Equal(structs.ServiceKindConnectProxy, v.ServiceKind)
assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination)
assert.Equal(args.Service.Proxy.DestinationServiceName, v.ServiceProxy.DestinationServiceName)
}
func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) {
@ -1834,7 +1867,7 @@ func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) {
// Register the service
{
dst := args.Service.ProxyDestination
dst := args.Service.Proxy.DestinationServiceName
args := structs.TestRegisterRequest(t)
args.Service.Service = dst
var out struct{}
@ -1845,25 +1878,25 @@ func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) {
req := structs.ServiceSpecificRequest{
Connect: true,
Datacenter: "dc1",
ServiceName: args.Service.ProxyDestination,
ServiceName: args.Service.Proxy.DestinationServiceName,
}
var resp structs.IndexedServiceNodes
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
assert.Len(resp.ServiceNodes, 1)
v := resp.ServiceNodes[0]
assert.Equal(structs.ServiceKindConnectProxy, v.ServiceKind)
assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination)
assert.Equal(args.Service.Proxy.DestinationServiceName, v.ServiceProxy.DestinationServiceName)
// List by non-Connect
req = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: args.Service.ProxyDestination,
ServiceName: args.Service.Proxy.DestinationServiceName,
}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
assert.Len(resp.ServiceNodes, 1)
v = resp.ServiceNodes[0]
assert.Equal(args.Service.ProxyDestination, v.ServiceName)
assert.Equal("", v.ServiceProxyDestination)
assert.Equal(args.Service.Proxy.DestinationServiceName, v.ServiceName)
assert.Equal("", v.ServiceProxy.DestinationServiceName)
}
// Test that calling ServiceNodes with Connect: true will return
@ -1947,7 +1980,7 @@ service "foo" {
// Register a proxy
args := structs.TestRegisterRequestProxy(t)
args.Service.Service = "foo-proxy"
args.Service.ProxyDestination = "bar"
args.Service.Proxy.DestinationServiceName = "bar"
args.WriteRequest.Token = "root"
var out struct{}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
@ -1955,14 +1988,14 @@ service "foo" {
// Register a proxy
args = structs.TestRegisterRequestProxy(t)
args.Service.Service = "foo-proxy"
args.Service.ProxyDestination = "foo"
args.Service.Proxy.DestinationServiceName = "foo"
args.WriteRequest.Token = "root"
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a proxy
args = structs.TestRegisterRequestProxy(t)
args.Service.Service = "another-proxy"
args.Service.ProxyDestination = "foo"
args.Service.Proxy.DestinationServiceName = "foo"
args.WriteRequest.Token = "root"
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
}
@ -2102,7 +2135,7 @@ func TestCatalog_NodeServices_ConnectProxy(t *testing.T) {
assert.Len(resp.NodeServices.Services, 1)
v := resp.NodeServices.Services[args.Service.Service]
assert.Equal(structs.ServiceKindConnectProxy, v.Kind)
assert.Equal(args.Service.ProxyDestination, v.ProxyDestination)
assert.Equal(args.Service.Proxy.DestinationServiceName, v.Proxy.DestinationServiceName)
}
func TestCatalog_NodeServices_ConnectNative(t *testing.T) {

View File

@ -865,7 +865,7 @@ service "foo" {
args.WriteRequest.Token = "root"
args.Service.ID = "foo-proxy-0"
args.Service.Service = "foo-proxy"
args.Service.ProxyDestination = "bar"
args.Service.Proxy.DestinationServiceName = "bar"
args.Check = &structs.HealthCheck{
Name: "proxy",
Status: api.HealthPassing,
@ -877,7 +877,7 @@ service "foo" {
args = structs.TestRegisterRequestProxy(t)
args.WriteRequest.Token = "root"
args.Service.Service = "foo-proxy"
args.Service.ProxyDestination = "foo"
args.Service.Proxy.DestinationServiceName = "foo"
args.Check = &structs.HealthCheck{
Name: "proxy",
Status: api.HealthPassing,
@ -889,7 +889,7 @@ service "foo" {
args = structs.TestRegisterRequestProxy(t)
args.WriteRequest.Token = "root"
args.Service.Service = "another-proxy"
args.Service.ProxyDestination = "foo"
args.Service.Proxy.DestinationServiceName = "foo"
args.Check = &structs.HealthCheck{
Name: "proxy",
Status: api.HealthPassing,

View File

@ -2652,7 +2652,7 @@ func TestPreparedQuery_Execute_ConnectExact(t *testing.T) {
case 2:
// Connect proxy
req.Service.Kind = structs.ServiceKindConnectProxy
req.Service.ProxyDestination = req.Service.Service
req.Service.Proxy.DestinationServiceName = req.Service.Service
req.Service.Service = "proxy"
}
@ -2727,7 +2727,7 @@ func TestPreparedQuery_Execute_ConnectExact(t *testing.T) {
require.Equal(reply.Service, reply.Nodes[0].Service.Service)
require.Equal(structs.ServiceKindConnectProxy, reply.Nodes[1].Service.Kind)
require.Equal(reply.Service, reply.Nodes[1].Service.ProxyDestination)
require.Equal(reply.Service, reply.Nodes[1].Service.Proxy.DestinationServiceName)
}
// Update the query
@ -2762,7 +2762,7 @@ func TestPreparedQuery_Execute_ConnectExact(t *testing.T) {
require.Equal(reply.Service, reply.Nodes[0].Service.Service)
require.Equal(structs.ServiceKindConnectProxy, reply.Nodes[1].Service.Kind)
require.Equal(reply.Service, reply.Nodes[1].Service.ProxyDestination)
require.Equal(reply.Service, reply.Nodes[1].Service.Proxy.DestinationServiceName)
}
// Unset the query

View File

@ -1433,11 +1433,11 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) {
Service: "connect-proxy",
Address: "1.1.1.1",
Port: 1111,
ProxyDestination: "foo",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "foo"},
}
// Service successfully registers into the state store.
@ -2032,8 +2032,8 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) {
assert.Nil(s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
assert.Nil(s.EnsureService(12, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
assert.Nil(s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000}))
assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000}))
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "native-db", Service: "db", Connect: structs.ServiceConnect{Native: true}}))
assert.Nil(s.EnsureService(17, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}))
assert.True(watchFired(ws))
@ -2994,8 +2994,8 @@ func TestStateStore_CheckConnectServiceNodes(t *testing.T) {
assert.Nil(s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
assert.Nil(s.EnsureService(12, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
assert.Nil(s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000}))
assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000}))
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}))
assert.True(watchFired(ws))
@ -3016,7 +3016,7 @@ func TestStateStore_CheckConnectServiceNodes(t *testing.T) {
for _, n := range nodes {
assert.Equal(structs.ServiceKindConnectProxy, n.Service.Kind)
assert.Equal("db", n.Service.ProxyDestination)
assert.Equal("db", n.Service.Proxy.DestinationServiceName)
}
}

View File

@ -23,7 +23,7 @@ func (idx *IndexConnectService) FromObject(obj interface{}) (bool, []byte, error
switch {
case sn.ServiceKind == structs.ServiceKindConnectProxy:
// For proxies, this service supports Connect for the destination
result = []byte(strings.ToLower(sn.ServiceProxyDestination))
result = []byte(strings.ToLower(sn.ServiceProxy.DestinationServiceName))
case sn.ServiceConnect.Native:
// For native, this service supports Connect directly

View File

@ -49,7 +49,7 @@ func TestIndexConnectService_FromObject(t *testing.T) {
&structs.ServiceNode{
ServiceKind: structs.ServiceKindConnectProxy,
ServiceName: "db",
ServiceProxyDestination: "fOo",
ServiceProxy: structs.ConnectProxyConfig{DestinationServiceName: "fOo"},
},
true,
[]byte("foo\x00"),

View File

@ -1544,7 +1544,7 @@ func TestDNS_ConnectServiceLookup(t *testing.T) {
{
args := structs.TestRegisterRequestProxy(t)
args.Address = "127.0.0.55"
args.Service.ProxyDestination = "db"
args.Service.Proxy.DestinationServiceName = "db"
args.Service.Address = ""
args.Service.Port = 12345
var out struct{}

View File

@ -856,7 +856,7 @@ func TestHealthConnectServiceNodes(t *testing.T) {
// Request
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?dc=dc1", args.Service.ProxyDestination), nil)
"/v1/health/connect/%s?dc=dc1", args.Service.Proxy.DestinationServiceName), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(err)
@ -888,7 +888,7 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
t.Run("bc_no_query_value", func(t *testing.T) {
assert := assert.New(t)
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing", args.Service.ProxyDestination), nil)
"/v1/health/connect/%s?passing", args.Service.Proxy.DestinationServiceName), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(err)
@ -902,7 +902,7 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
t.Run("passing_true", func(t *testing.T) {
assert := assert.New(t)
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing=true", args.Service.ProxyDestination), nil)
"/v1/health/connect/%s?passing=true", args.Service.Proxy.DestinationServiceName), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(err)
@ -916,7 +916,7 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
t.Run("passing_false", func(t *testing.T) {
assert := assert.New(t)
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing=false", args.Service.ProxyDestination), nil)
"/v1/health/connect/%s?passing=false", args.Service.Proxy.DestinationServiceName), nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthConnectServiceNodes(resp, req)
assert.Nil(err)
@ -930,7 +930,7 @@ func TestHealthConnectServiceNodes_PassingFilter(t *testing.T) {
t.Run("passing_bad", func(t *testing.T) {
assert := assert.New(t)
req, _ := http.NewRequest("GET", fmt.Sprintf(
"/v1/health/connect/%s?passing=nope-nope", args.Service.ProxyDestination), nil)
"/v1/health/connect/%s?passing=nope-nope", args.Service.Proxy.DestinationServiceName), nil)
resp := httptest.NewRecorder()
a.srv.HealthConnectServiceNodes(resp, req)
assert.Equal(400, resp.Code)

View File

@ -662,7 +662,9 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token,
Kind: structs.ServiceKindConnectProxy,
ID: target.ID + "-proxy",
Service: target.Service + "-proxy",
ProxyDestination: target.Service,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: target.Service,
},
Address: cfg.BindAddress,
Port: cfg.BindPort,
}

View File

@ -280,7 +280,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
ID: "mysql-proxy",
Service: "mysql-proxy",
Port: 5000,
ProxyDestination: "db",
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
@ -296,7 +296,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Service: "redis-proxy",
Port: 8000,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "redis",
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "redis"},
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
@ -316,7 +316,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Service: "web-proxy",
Port: 80,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "web",
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "web"},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
@ -330,7 +330,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Service: "lb-proxy",
Port: 443,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "lb",
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "lb"},
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
@ -345,7 +345,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Service: "cache-proxy",
Port: 11211,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "cache-proxy",
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "cache-proxy"},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
@ -1862,7 +1862,7 @@ func TestStateProxyManagement(t *testing.T) {
assert.Equal("web-proxy", svc.ID)
assert.Equal("web-proxy", svc.Service)
assert.Equal(structs.ServiceKindConnectProxy, svc.Kind)
assert.Equal("web", svc.ProxyDestination)
assert.Equal("web", svc.Proxy.DestinationServiceName)
assert.Equal("", svc.Address, "should have empty address by default")
// Port is non-deterministic but could be either of 20000 or 20001
assert.Contains([]int{20000, 20001}, svc.Port)
@ -1878,7 +1878,7 @@ func TestStateProxyManagement(t *testing.T) {
assert.Equal("web-proxy", svcDup.ID)
assert.Equal("web-proxy", svcDup.Service)
assert.Equal(structs.ServiceKindConnectProxy, svcDup.Kind)
assert.Equal("web", svcDup.ProxyDestination)
assert.Equal("web", svcDup.Proxy.DestinationServiceName)
assert.Equal("", svcDup.Address, "should have empty address by default")
// Port must be same as before
assert.Equal(svc.Port, svcDup.Port)

View File

@ -92,6 +92,9 @@ type ConnectManagedProxy struct {
// Config is the arbitrary configuration data provided with the registration.
Config map[string]interface{}
// Upstreams are the dependencies the proxy should setup outgoing listners for.
Upstreams Upstreams
// ProxyService is a pointer to the local proxy's service record for
// convenience. The proxies ID and name etc. can be read from there. It may be
// nil if the agent is starting up and hasn't registered the service yet. We

View File

@ -0,0 +1,171 @@
package structs
import (
"fmt"
"github.com/hashicorp/consul/api"
)
// ConnectProxyConfig describes the configuration needed for any proxy managed
// or unmanaged. It describes a single logical service's listener and optionally
// upstreams and sidecar-related config for a single instance. To describe a
// centralised proxy that routed traffic for multiple services, a different one
// of these would be needed for each, sharing the same LogicalProxyID.
type ConnectProxyConfig struct {
// DestinationServiceName is required and is the name of the service to accept
// traffic for.
DestinationServiceName string
// DestinationServiceID is optional and should only be specified for
// "side-car" style proxies where the proxy is in front of just a single
// instance of the service. It should be set to the service ID of the instance
// being represented which must be registered to the same agent. It's valid to
// provide a service ID that does not yet exist to avoid timing issues when
// bootstrapping a service with a proxy.
DestinationServiceID string
// LocalServiceAddress is the address of the local service instance. It is
// optional and should only be specified for "side-car" style proxies. It will
// default to 127.0.0.1 if the proxy is a "side-car" (DestinationServiceID is
// set) but otherwise will be ignored.
LocalServiceAddress string
// LocalServicePort is the port of the local service instance. It is optional
// and should only be specified for "side-car" style proxies. It will default
// to the registered port for the instance if the proxy is a "side-car"
// (DestinationServiceID is set) but otherwise will be ignored.
LocalServicePort int
// Config is the arbitrary configuration data provided with the proxy
// registration.
Config map[string]interface{}
// Upstreams describes any upstream dependencies the proxy instance should
// setup.
Upstreams Upstreams
}
// ToAPI returns the api struct with the same fields. We have duplicates to
// avoid the api package depending on this one which imports a ton of Consul's
// core which you don't want if you are just trying to use our client in your
// app.
func (c *ConnectProxyConfig) ToAPI() *api.AgentServiceConnectProxyConfig {
return &api.AgentServiceConnectProxyConfig{
DestinationServiceName: c.DestinationServiceName,
DestinationServiceID: c.DestinationServiceID,
LocalServiceAddress: c.LocalServiceAddress,
LocalServicePort: c.LocalServicePort,
Config: c.Config,
Upstreams: c.Upstreams.ToAPI(),
}
}
const (
UpstreamDestTypeService = "service"
UpstreamDestTypePreparedQuery = "prepared_query"
)
// Upstreams is a list of upstreams. Aliased to allow ToAPI method.
type Upstreams []Upstream
// ToAPI returns the api structs with the same fields. We have duplicates to
// avoid the api package depending on this one which imports a ton of Consul's
// core which you don't want if you are just trying to use our client in your
// app.
func (us Upstreams) ToAPI() []api.Upstream {
a := make([]api.Upstream, len(us))
for i, u := range us {
a[i] = u.ToAPI()
}
return a
}
// UpstreamsFromAPI is a helper for converting api.Upstream to Upstream.
func UpstreamsFromAPI(us []api.Upstream) Upstreams {
a := make([]Upstream, len(us))
for i, u := range us {
a[i] = UpstreamFromAPI(u)
}
return a
}
// Upstream represents a single upstream dependency for a service or proxy. It
// describes the mechanism used to discover instances to communicate with (the
// Target) as well as any potential client configuration that may be useful such
// as load balancer options, timeouts etc.
type Upstream struct {
// Destination fields are the required ones for determining what this upstream
// points to. Depending on DestinationType some other fields below might
// further restrict the set of instances allowable.
//
// DestinationType would be better as an int constant but even with custom
// JSON marshallers it causes havoc with all the mapstructure mangling we do
// on service definitions in various places.
DestinationType string
DestinationNamespace string `json:",omitempty"`
DestinationName string
// Datacenter that the service discovery request should be run against. Note
// for prepared queries, the actual results might be from a different
// datacenter.
Datacenter string
// LocalBindAddress is the ip address a side-car proxy should listen on for
// traffic destined for this upstream service. Default if empty is 127.0.0.1.
LocalBindAddress string `json:",omitempty"`
// LocalBindPort is the ip address a side-car proxy should listen on for traffic
// destined for this upstream service. Required.
LocalBindPort int
// Config is an opaque config that is specific to the proxy process being run.
// It can be used to pass abritrary configuration for this specific upstream
// to the proxy.
Config map[string]interface{}
}
// Validate sanity checks the struct is valid
func (u *Upstream) Validate() error {
if u.DestinationType != UpstreamDestTypeService &&
u.DestinationType != UpstreamDestTypePreparedQuery {
return fmt.Errorf("unknown upstream destination type")
}
if u.DestinationName == "" {
return fmt.Errorf("upstream destination name cannot be empty")
}
if u.LocalBindPort == 0 {
return fmt.Errorf("upstream local bind port cannot be zero")
}
return nil
}
// ToAPI returns the api structs with the same fields. We have duplicates to
// avoid the api package depending on this one which imports a ton of Consul's
// core which you don't want if you are just trying to use our client in your
// app.
func (u *Upstream) ToAPI() api.Upstream {
return api.Upstream{
DestinationType: api.UpstreamDestType(u.DestinationType),
DestinationNamespace: u.DestinationNamespace,
DestinationName: u.DestinationName,
Datacenter: u.Datacenter,
LocalBindAddress: u.LocalBindAddress,
LocalBindPort: u.LocalBindPort,
Config: u.Config,
}
}
// UpstreamFromAPI is a helper for converting api.Upstream to Upstream.
func UpstreamFromAPI(u api.Upstream) Upstream {
return Upstream{
DestinationType: string(u.DestinationType),
DestinationNamespace: u.DestinationNamespace,
DestinationName: u.DestinationName,
Datacenter: u.Datacenter,
LocalBindAddress: u.LocalBindAddress,
LocalBindPort: u.LocalBindPort,
Config: u.Config,
}
}

View File

@ -0,0 +1,181 @@
package structs
import (
"encoding/json"
"testing"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
)
func TestConnectProxyConfig_ToAPI(t *testing.T) {
tests := []struct {
name string
in ConnectProxyConfig
want *api.AgentServiceConnectProxyConfig
}{
{
name: "service",
in: ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web1",
LocalServiceAddress: "127.0.0.2",
LocalServicePort: 5555,
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: Upstreams{
{
DestinationType: UpstreamDestTypeService,
DestinationName: "foo",
Datacenter: "dc1",
LocalBindPort: 1234,
},
{
DestinationType: UpstreamDestTypePreparedQuery,
DestinationName: "foo",
Datacenter: "dc1",
LocalBindPort: 2345,
LocalBindAddress: "127.10.10.10",
},
},
},
want: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web1",
LocalServiceAddress: "127.0.0.2",
LocalServicePort: 5555,
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: []api.Upstream{
{
DestinationType: UpstreamDestTypeService,
DestinationName: "foo",
Datacenter: "dc1",
LocalBindPort: 1234,
},
{
DestinationType: UpstreamDestTypePreparedQuery,
DestinationName: "foo",
Datacenter: "dc1",
LocalBindPort: 2345,
LocalBindAddress: "127.10.10.10",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.want, tt.in.ToAPI())
})
}
}
func TestUpstream_MarshalJSON(t *testing.T) {
tests := []struct {
name string
in Upstream
want string
wantErr bool
}{
{
name: "service",
in: Upstream{
DestinationType: UpstreamDestTypeService,
DestinationName: "foo",
Datacenter: "dc1",
LocalBindPort: 1234,
},
want: `{
"DestinationType": "service",
"DestinationName": "foo",
"Datacenter": "dc1",
"LocalBindPort": 1234,
"Config": null
}`,
wantErr: false,
},
{
name: "pq",
in: Upstream{
DestinationType: UpstreamDestTypePreparedQuery,
DestinationName: "foo",
Datacenter: "dc1",
LocalBindPort: 1234,
},
want: `{
"DestinationType": "prepared_query",
"DestinationName": "foo",
"Datacenter": "dc1",
"LocalBindPort": 1234,
"Config": null
}`,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
got, err := json.Marshal(tt.in)
if tt.wantErr {
require.Error(err)
return
}
require.NoError(err)
require.JSONEq(tt.want, string(got))
})
}
}
func TestUpstream_UnmarshalJSON(t *testing.T) {
tests := []struct {
name string
json string
want Upstream
wantErr bool
}{
{
name: "service",
json: `{
"DestinationType": "service",
"DestinationName": "foo",
"Datacenter": "dc1"
}`,
want: Upstream{
DestinationType: UpstreamDestTypeService,
DestinationName: "foo",
Datacenter: "dc1",
},
wantErr: false,
},
{
name: "pq",
json: `{
"DestinationType": "prepared_query",
"DestinationName": "foo",
"Datacenter": "dc1"
}`,
want: Upstream{
DestinationType: UpstreamDestTypePreparedQuery,
DestinationName: "foo",
Datacenter: "dc1",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
var got Upstream
err := json.Unmarshal([]byte(tt.json), &got)
if tt.wantErr {
require.Error(err)
return
}
require.NoError(err)
require.Equal(tt.want, got)
})
}
}

View File

@ -26,7 +26,22 @@ type ServiceDefinition struct {
Weights *Weights
Token string
EnableTagOverride bool
ProxyDestination string
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
// ProxyDestination is deprecated in favour of Proxy.DestinationServiceName
ProxyDestination string `json:",omitempty"`
// Proxy is the configuration set for Kind = connect-proxy. It is mandatory in
// that case and an error to be set for any other kind. This config is part of
// a proxy service definition and is distinct from but shares some fields with
// the Connect.Proxy which configures a manageged proxy as part of the actual
// service's definition. This duplication is ugly but seemed better than the
// alternative which was to re-use the same struct fields for both cases even
// though the semantics are different and the non-shared fields make no sense
// in the other case. ProxyConfig may be a more natural name here, but it's
// confusing for the UX because one of the fields in ConnectProxyConfig is
// also called just "Config"
Proxy *ConnectProxyConfig
Connect *ServiceConnect
}
@ -41,11 +56,23 @@ func (s *ServiceDefinition) NodeService() *NodeService {
Port: s.Port,
Weights: s.Weights,
EnableTagOverride: s.EnableTagOverride,
ProxyDestination: s.ProxyDestination,
}
if s.Connect != nil {
ns.Connect = *s.Connect
}
if s.Proxy != nil {
ns.Proxy = *s.Proxy
// Ensure the Upstream type is defaulted
for i := range ns.Proxy.Upstreams {
if ns.Proxy.Upstreams[i].DestinationType == "" {
ns.Proxy.Upstreams[i].DestinationType = UpstreamDestTypeService
}
}
} else {
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
// Legacy convert ProxyDestination into a Proxy config
ns.Proxy.DestinationServiceName = s.ProxyDestination
}
if ns.ID == "" && ns.Service != "" {
ns.ID = ns.Service
}
@ -72,18 +99,100 @@ func (s *ServiceDefinition) ConnectManagedProxy() (*ConnectManagedProxy, error)
return nil, err
}
// If upstreams were set in the config and NOT in the actual Upstreams field,
// extract them out to the new explicit Upstreams and unset in config to make
// transition smooth.
if deprecatedUpstreams, ok := s.Connect.Proxy.Config["upstreams"]; ok {
if len(s.Connect.Proxy.Upstreams) == 0 {
if slice, ok := deprecatedUpstreams.([]interface{}); ok {
for _, raw := range slice {
var oldU deprecatedBuiltInProxyUpstreamConfig
var decMeta mapstructure.Metadata
decCfg := &mapstructure.DecoderConfig{
Metadata: &decMeta,
Result: &oldU,
}
dec, err := mapstructure.NewDecoder(decCfg)
if err != nil {
// Just skip it - we never used to parse this so never failed
// invalid stuff till it hit the proxy. This is a best-effort
// attempt to not break existing service definitions so it's not the
// end of the world if we don't have exactly the same failure mode
// for invalid input.
continue
}
err = dec.Decode(raw)
if err != nil {
// same logic as above
continue
}
newT := UpstreamDestTypeService
if oldU.DestinationType == "prepared_query" {
newT = UpstreamDestTypePreparedQuery
}
u := Upstream{
DestinationType: newT,
DestinationName: oldU.DestinationName,
DestinationNamespace: oldU.DestinationNamespace,
Datacenter: oldU.DestinationDatacenter,
LocalBindAddress: oldU.LocalBindAddress,
LocalBindPort: oldU.LocalBindPort,
}
// Any unrecognized keys should be copied into the config map
if len(decMeta.Unused) > 0 {
u.Config = make(map[string]interface{})
// Paranoid type assertion - mapstructure would have errored if this
// wasn't safe but panics are bad...
if rawMap, ok := raw.(map[string]interface{}); ok {
for _, k := range decMeta.Unused {
u.Config[k] = rawMap[k]
}
}
}
s.Connect.Proxy.Upstreams = append(s.Connect.Proxy.Upstreams, u)
}
}
}
// Remove upstreams even if we didn't add them for consistency.
delete(s.Connect.Proxy.Config, "upstreams")
}
p := &ConnectManagedProxy{
ExecMode: execMode,
Command: s.Connect.Proxy.Command,
Config: s.Connect.Proxy.Config,
Upstreams: s.Connect.Proxy.Upstreams,
// ProxyService will be setup when the agent registers the configured
// proxies and starts them etc.
TargetServiceID: ns.ID,
}
// Ensure the Upstream type is defaulted
for i := range p.Upstreams {
if p.Upstreams[i].DestinationType == "" {
p.Upstreams[i].DestinationType = UpstreamDestTypeService
}
}
return p, nil
}
// deprecatedBuiltInProxyUpstreamConfig is a struct for extracting old
// connect/proxy.UpstreamConfiguration syntax upstreams from existing managed
// proxy configs to convert them to new first-class Upstreams.
type deprecatedBuiltInProxyUpstreamConfig struct {
LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr" mapstructure:"local_bind_address"`
LocalBindPort int `json:"local_bind_port" hcl:"local_bind_port,attr" mapstructure:"local_bind_port"`
DestinationName string `json:"destination_name" hcl:"destination_name,attr" mapstructure:"destination_name"`
DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr" mapstructure:"destination_namespace"`
DestinationType string `json:"destination_type" hcl:"destination_type,attr" mapstructure:"destination_type"`
DestinationDatacenter string `json:"destination_datacenter" hcl:"destination_datacenter,attr" mapstructure:"destination_datacenter"`
// ConnectTimeoutMs is removed explicitly because any additional config we
// find including this field should be put into the opaque Config map in
// Upstream.
}
// Validate validates the service definition. This also calls the underlying
// Validate method on the NodeService.
//
@ -140,6 +249,7 @@ type ServiceDefinitionConnectProxy struct {
Command []string `json:",omitempty"`
ExecMode string `json:",omitempty"`
Config map[string]interface{} `json:",omitempty"`
Upstreams []Upstream `json:",omitempty"`
}
func (p *ServiceDefinitionConnectProxy) MarshalJSON() ([]byte, error) {

View File

@ -84,6 +84,7 @@ func TestServiceDefinitionValidate(t *testing.T) {
{
"managed proxy with no port set",
func(x *ServiceDefinition) {
x.Port = 0 // Explicitly unset this as the test default sets it sanely
x.Connect = &ServiceConnect{
Proxy: &ServiceDefinitionConnectProxy{},
}
@ -111,13 +112,12 @@ func TestServiceDefinitionValidate(t *testing.T) {
tc.Modify(service)
err := service.Validate()
t.Logf("error: %s", err)
require.Equal(err != nil, tc.Err != "")
if err == nil {
return
}
if tc.Err == "" {
require.NoError(err)
} else {
require.Error(err)
require.Contains(strings.ToLower(err.Error()), strings.ToLower(tc.Err))
}
})
}
}

View File

@ -524,7 +524,9 @@ type ServiceNode struct {
ServiceMeta map[string]string
ServicePort int
ServiceEnableTagOverride bool
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
ServiceProxyDestination string
ServiceProxy ConnectProxyConfig
ServiceConnect ServiceConnect
RaftIndex
@ -554,7 +556,9 @@ func (s *ServiceNode) PartialClone() *ServiceNode {
ServiceMeta: nsmeta,
ServiceWeights: s.ServiceWeights,
ServiceEnableTagOverride: s.ServiceEnableTagOverride,
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
ServiceProxyDestination: s.ServiceProxyDestination,
ServiceProxy: s.ServiceProxy,
ServiceConnect: s.ServiceConnect,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
@ -575,7 +579,7 @@ func (s *ServiceNode) ToNodeService() *NodeService {
Meta: s.ServiceMeta,
Weights: &s.ServiceWeights,
EnableTagOverride: s.ServiceEnableTagOverride,
ProxyDestination: s.ServiceProxyDestination,
Proxy: s.ServiceProxy,
Connect: s.ServiceConnect,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
@ -624,13 +628,32 @@ type NodeService struct {
Weights *Weights
EnableTagOverride bool
// ProxyDestination is the name of the service that this service is
// a Connect proxy for. This is only valid if Kind is "connect-proxy".
// The destination may be a service that isn't present in the catalog.
// This is expected and allowed to allow for proxies to come up
// earlier than their target services.
// ProxyDestination is DEPRECATED in favor of Proxy.DestinationServiceName.
// It's retained since this struct is used to parse input for
// /catalog/register but nothing else internal should use it - once
// request/config definitinos are passes all internal uses of NodeService
// should have this empty and use the Proxy.DestinationServiceNames field
// below.
//
// It used to store the name of the service that this service is a Connect
// proxy for. This is only valid if Kind is "connect-proxy". The destination
// may be a service that isn't present in the catalog. This is expected and
// allowed to allow for proxies to come up earlier than their target services.
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
ProxyDestination string
// Proxy is the configuration set for Kind = connect-proxy. It is mandatory in
// that case and an error to be set for any other kind. This config is part of
// a proxy service definition and is distinct from but shares some fields with
// the Connect.Proxy which configures a manageged proxy as part of the actual
// service's definition. This duplication is ugly but seemed better than the
// alternative which was to re-use the same struct fields for both cases even
// though the semantics are different and the non-shred fields make no sense
// in the other case. ProxyConfig may be a more natural name here, but it's
// confusing for the UX because one of the fields in ConnectProxyConfig is
// also called just "Config"
Proxy ConnectProxyConfig
// Connect are the Connect settings for a service. This is purposely NOT
// a pointer so that we never have to nil-check this.
Connect ServiceConnect
@ -661,9 +684,17 @@ func (s *NodeService) Validate() error {
// ConnectProxy validation
if s.Kind == ServiceKindConnectProxy {
if strings.TrimSpace(s.ProxyDestination) == "" {
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
// Fixup legacy requests that specify the ProxyDestination still
if s.ProxyDestination != "" && s.Proxy.DestinationServiceName == "" {
s.Proxy.DestinationServiceName = s.ProxyDestination
s.ProxyDestination = ""
}
if strings.TrimSpace(s.Proxy.DestinationServiceName) == "" {
result = multierror.Append(result, fmt.Errorf(
"ProxyDestination must be non-empty for Connect proxy services"))
"Proxy.DestinationServiceName must be non-empty for Connect proxy "+
"services"))
}
if s.Port == 0 {
@ -694,7 +725,7 @@ func (s *NodeService) IsSame(other *NodeService) bool {
!reflect.DeepEqual(s.Meta, other.Meta) ||
s.EnableTagOverride != other.EnableTagOverride ||
s.Kind != other.Kind ||
s.ProxyDestination != other.ProxyDestination ||
!reflect.DeepEqual(s.Proxy, other.Proxy) ||
s.Connect != other.Connect {
return false
}
@ -713,6 +744,11 @@ func (s *NodeService) ToServiceNode(node string) *ServiceNode {
theWeights = *s.Weights
}
}
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
legacyProxyDest := s.Proxy.DestinationServiceName
if legacyProxyDest == "" {
legacyProxyDest = s.ProxyDestination
}
return &ServiceNode{
// Skip ID, see ServiceNode definition.
Node: node,
@ -727,7 +763,8 @@ func (s *NodeService) ToServiceNode(node string) *ServiceNode {
ServiceMeta: s.Meta,
ServiceWeights: theWeights,
ServiceEnableTagOverride: s.EnableTagOverride,
ServiceProxyDestination: s.ProxyDestination,
ServiceProxy: s.Proxy,
ServiceProxyDestination: legacyProxyDest,
ServiceConnect: s.Connect,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEncodeDecode(t *testing.T) {
@ -122,7 +123,7 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) {
}
// testServiceNode gives a fully filled out ServiceNode instance.
func testServiceNode() *ServiceNode {
func testServiceNode(t *testing.T) *ServiceNode {
return &ServiceNode{
ID: types.NodeID("40e4a748-2192-161a-0510-9bf59fe950b5"),
Node: "node1",
@ -144,16 +145,25 @@ func testServiceNode() *ServiceNode {
"service": "metadata",
},
ServiceEnableTagOverride: true,
ServiceProxyDestination: "cats",
RaftIndex: RaftIndex{
CreateIndex: 1,
ModifyIndex: 2,
},
ServiceProxy: TestConnectProxyConfig(t),
// DEPRECATED (ProxyDestination) - remove this when removing ProxyDestination
// ServiceProxyDestination is deprecated bit must be set consistently with
// the value of ServiceProxy.DestinationServiceName otherwise a round-trip
// through ServiceNode -> NodeService and back will not match and fail
// tests.
ServiceProxyDestination: "web",
ServiceConnect: ServiceConnect{
Native: true,
},
}
}
func TestStructs_ServiceNode_PartialClone(t *testing.T) {
sn := testServiceNode()
sn := testServiceNode(t)
clone := sn.PartialClone()
@ -173,9 +183,7 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) {
sn.Datacenter = ""
sn.TaggedAddresses = nil
sn.NodeMeta = nil
if !reflect.DeepEqual(sn, clone) {
t.Fatalf("bad: %v", clone)
}
require.Equal(t, sn, clone)
sn.ServiceTags = append(sn.ServiceTags, "hello")
if reflect.DeepEqual(sn, clone) {
@ -201,7 +209,7 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) {
}
func TestStructs_ServiceNode_Conversions(t *testing.T) {
sn := testServiceNode()
sn := testServiceNode(t)
sn2 := sn.ToNodeService().ToServiceNode("node1")
@ -213,9 +221,7 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) {
sn.TaggedAddresses = nil
sn.NodeMeta = nil
sn.ServiceWeights = Weights{Passing: 1, Warning: 1}
if !reflect.DeepEqual(sn, sn2) {
t.Fatalf("bad: %#v, but expected %#v", sn2, sn)
}
require.Equal(t, sn, sn2)
}
func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
@ -232,19 +238,19 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
{
"connect-proxy: no ProxyDestination",
func(x *NodeService) { x.ProxyDestination = "" },
"ProxyDestination must be",
func(x *NodeService) { x.Proxy.DestinationServiceName = "" },
"Proxy.DestinationServiceName must be",
},
{
"connect-proxy: whitespace ProxyDestination",
func(x *NodeService) { x.ProxyDestination = " " },
"ProxyDestination must be",
func(x *NodeService) { x.Proxy.DestinationServiceName = " " },
"Proxy.DestinationServiceName must be",
},
{
"connect-proxy: valid ProxyDestination",
func(x *NodeService) { x.ProxyDestination = "hello" },
func(x *NodeService) { x.Proxy.DestinationServiceName = "hello" },
"",
},
@ -290,7 +296,12 @@ func TestStructs_NodeService_IsSame(t *testing.T) {
},
Port: 1234,
EnableTagOverride: true,
ProxyDestination: "db",
Proxy: ConnectProxyConfig{
DestinationServiceName: "db",
Config: map[string]interface{}{
"foo": "bar",
},
},
}
if !ns.IsSame(ns) {
t.Fatalf("should be equal to itself")
@ -308,7 +319,12 @@ func TestStructs_NodeService_IsSame(t *testing.T) {
"meta2": "value2",
"meta1": "value1",
},
ProxyDestination: "db",
Proxy: ConnectProxyConfig{
DestinationServiceName: "db",
Config: map[string]interface{}{
"foo": "bar",
},
},
RaftIndex: RaftIndex{
CreateIndex: 1,
ModifyIndex: 2,
@ -319,6 +335,7 @@ func TestStructs_NodeService_IsSame(t *testing.T) {
}
check := func(twiddle, restore func()) {
t.Helper()
if !ns.IsSame(other) || !other.IsSame(ns) {
t.Fatalf("should be the same")
}
@ -330,7 +347,7 @@ func TestStructs_NodeService_IsSame(t *testing.T) {
restore()
if !ns.IsSame(other) || !other.IsSame(ns) {
t.Fatalf("should be the same")
t.Fatalf("should be the same again")
}
}
@ -343,7 +360,11 @@ func TestStructs_NodeService_IsSame(t *testing.T) {
check(func() { other.Meta["meta2"] = "wrongValue" }, func() { other.Meta["meta2"] = "value2" })
check(func() { other.EnableTagOverride = false }, func() { other.EnableTagOverride = true })
check(func() { other.Kind = ServiceKindConnectProxy }, func() { other.Kind = "" })
check(func() { other.ProxyDestination = "" }, func() { other.ProxyDestination = "db" })
check(func() { other.Proxy.DestinationServiceName = "" }, func() { other.Proxy.DestinationServiceName = "db" })
check(func() { other.Proxy.DestinationServiceID = "XXX" }, func() { other.Proxy.DestinationServiceID = "" })
check(func() { other.Proxy.LocalServiceAddress = "XXX" }, func() { other.Proxy.LocalServiceAddress = "" })
check(func() { other.Proxy.LocalServicePort = 9999 }, func() { other.Proxy.LocalServicePort = 0 })
check(func() { other.Proxy.Config["baz"] = "XXX" }, func() { delete(other.Proxy.Config, "baz") })
check(func() { other.Connect.Native = true }, func() { other.Connect.Native = false })
}

View File

@ -42,9 +42,9 @@ func TestNodeService(t testing.T) *NodeService {
func TestNodeServiceProxy(t testing.T) *NodeService {
return &NodeService{
Kind: ServiceKindConnectProxy,
Service: "connect-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 2222,
ProxyDestination: "web",
Proxy: TestConnectProxyConfig(t),
}
}

View File

@ -0,0 +1,36 @@
package structs
import "github.com/mitchellh/go-testing-interface"
// TestConnectProxyConfig returns a ConnectProxyConfig representing a valid
// Connect proxy.
func TestConnectProxyConfig(t testing.T) ConnectProxyConfig {
return ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: TestUpstreams(t),
}
}
// TestUpstreams returns a set of upstreams to be used in tests exercising most
// important configuration patterns.
func TestUpstreams(t testing.T) Upstreams {
return Upstreams{
{
// We rely on this one having default type in a few tests...
DestinationName: "db",
LocalBindPort: 9191,
Config: map[string]interface{}{
// Float because this is how it is decoded by JSON decoder so this
// enables the value returned to be compared directly to a decoded JSON
// response without spurios type loss.
"connect_timeout_ms": float64(1000),
},
},
{
DestinationType: UpstreamDestTypePreparedQuery,
DestinationName: "geo-cache",
LocalBindPort: 8181,
LocalBindAddress: "127.10.10.10",
},
}
}

View File

@ -8,6 +8,7 @@ import (
func TestServiceDefinition(t testing.T) *ServiceDefinition {
return &ServiceDefinition{
Name: "db",
Port: 1234,
}
}
@ -15,6 +16,10 @@ func TestServiceDefinition(t testing.T) *ServiceDefinition {
func TestServiceDefinitionProxy(t testing.T) *ServiceDefinition {
return &ServiceDefinition{
Kind: ServiceKindConnectProxy,
ProxyDestination: "db",
Name: "foo-proxy",
Port: 1234,
Proxy: &ConnectProxyConfig{
DestinationServiceName: "db",
},
}
}

View File

@ -38,6 +38,18 @@ const (
ProxyExecModeScript ProxyExecMode = "script"
)
// UpstreamDestType is the type of upstream discovery mechanism.
type UpstreamDestType string
const (
// UpstreamDestTypeService discovers instances via healthy service lookup.
UpstreamDestTypeService UpstreamDestType = "service"
// UpstreamDestTypePreparedQuery discovers instances via prepared query
// execution.
UpstreamDestTypePreparedQuery UpstreamDestType = "prepared_query"
)
// AgentCheck represents a check known to the agent
type AgentCheck struct {
Node string
@ -59,7 +71,7 @@ type AgentWeights struct {
// AgentService represents a service known to the agent
type AgentService struct {
Kind ServiceKind
Kind ServiceKind `json:",omitempty"`
ID string
Service string
Tags []string
@ -68,24 +80,38 @@ type AgentService struct {
Address string
Weights AgentWeights
EnableTagOverride bool
CreateIndex uint64
ModifyIndex uint64
ProxyDestination string
Connect *AgentServiceConnect
CreateIndex uint64 `json:",omitempty"`
ModifyIndex uint64 `json:",omitempty"`
// DEPRECATED (ProxyDestination) - remove this field
ProxyDestination string `json:",omitempty"`
Proxy *AgentServiceConnectProxyConfig `json:",omitempty"`
Connect *AgentServiceConnect `json:",omitempty"`
}
// AgentServiceConnect represents the Connect configuration of a service.
type AgentServiceConnect struct {
Native bool
Proxy *AgentServiceConnectProxy
Native bool `json:",omitempty"`
Proxy *AgentServiceConnectProxy `json:",omitempty"`
}
// AgentServiceConnectProxy represents the Connect Proxy configuration of a
// service.
type AgentServiceConnectProxy struct {
ExecMode ProxyExecMode
Command []string
Config map[string]interface{}
ExecMode ProxyExecMode `json:",omitempty"`
Command []string `json:",omitempty"`
Config map[string]interface{} `json:",omitempty"`
Upstreams []Upstream `json:",omitempty"`
}
// AgentServiceConnectProxyConfig is the proxy configuration in a connect-proxy
// ServiceDefinition or response.
type AgentServiceConnectProxyConfig struct {
DestinationServiceName string
DestinationServiceID string `json:",omitempty"`
LocalServiceAddress string `json:",omitempty"`
LocalServicePort int `json:",omitempty"`
Config map[string]interface{} `json:",omitempty"`
Upstreams []Upstream
}
// AgentMember represents a cluster member known to the agent
@ -129,7 +155,9 @@ type AgentServiceRegistration struct {
Weights *AgentWeights `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
// DEPRECATED (ProxyDestination) - remove this field
ProxyDestination string `json:",omitempty"`
Proxy *AgentServiceConnectProxyConfig `json:",omitempty"`
Connect *AgentServiceConnect `json:",omitempty"`
}
@ -236,6 +264,18 @@ type ConnectProxyConfig struct {
ExecMode ProxyExecMode
Command []string
Config map[string]interface{}
Upstreams []Upstream
}
// Upstream is the response structure for a proxy upstream configuration.
type Upstream struct {
DestinationType UpstreamDestType `json:",omitempty"`
DestinationNamespace string `json:",omitempty"`
DestinationName string
Datacenter string `json:",omitempty"`
LocalBindAddress string `json:",omitempty"`
LocalBindPort int `json:",omitempty"`
Config map[string]interface{} `json:",omitempty"`
}
// Agent can be used to query the Agent endpoints

View File

@ -207,6 +207,11 @@ func TestAPI_AgentServices_ManagedConnectProxy(t *testing.T) {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: []Upstream{{
DestinationType: "prepared_query",
DestinationName: "bar",
LocalBindPort: 9191,
}},
},
},
}
@ -235,7 +240,82 @@ func TestAPI_AgentServices_ManagedConnectProxy(t *testing.T) {
t.Fatalf("Bad: %#v", chk)
}
// Proxy config should be present in response
// Proxy config should be correct
require.Equal(t, reg.Connect, services["foo"].Connect)
if err := agent.ServiceDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAPI_AgentServices_ManagedConnectProxyDeprecatedUpstreams(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentServiceRegistration{
Name: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
Check: &AgentServiceCheck{
TTL: "15s",
},
Connect: &AgentServiceConnect{
Proxy: &AgentServiceConnectProxy{
ExecMode: ProxyExecModeScript,
Command: []string{"foo.rb"},
Config: map[string]interface{}{
"foo": "bar",
"upstreams": []interface{}{
map[string]interface{}{
"destination_type": "prepared_query",
"destination_name": "bar",
"local_bind_port": 9191,
"connect_timeout_ms": 1000,
},
},
},
},
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services["foo"]; !ok {
t.Fatalf("missing service: %v", services)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
chk, ok := checks["service:foo"]
if !ok {
t.Fatalf("missing check: %v", checks)
}
// Checks should default to critical
if chk.Status != HealthCritical {
t.Fatalf("Bad: %#v", chk)
}
// Proxy config should be present in response, minus the upstreams
delete(reg.Connect.Proxy.Config, "upstreams")
// Upstreams should be translated into proper field
reg.Connect.Proxy.Upstreams = []Upstream{{
DestinationType: "prepared_query",
DestinationName: "bar",
LocalBindPort: 9191,
Config: map[string]interface{}{
"connect_timeout_ms": float64(1000),
},
}}
require.Equal(t, reg.Connect, services["foo"].Connect)
if err := agent.ServiceDeregister("foo"); err != nil {
@ -263,7 +343,9 @@ func TestAPI_AgentServices_ExternalConnectProxy(t *testing.T) {
Kind: ServiceKindConnectProxy,
Name: "foo-proxy",
Port: 8001,
ProxyDestination: "foo",
Proxy: &AgentServiceConnectProxyConfig{
DestinationServiceName: "foo",
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
@ -1139,6 +1221,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: testUpstreams(t),
},
},
}
@ -1152,7 +1235,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
ProxyServiceID: "foo-proxy",
TargetServiceID: "foo",
TargetServiceName: "foo",
ContentHash: "2a29f8237db69d0e",
ContentHash: "acdf5eb6f5794a14",
ExecMode: "daemon",
Command: []string{"consul", "connect", "proxy"},
Config: map[string]interface{}{
@ -1161,6 +1244,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
"foo": "bar",
"local_service_address": "127.0.0.1:8000",
},
Upstreams: testExpectUpstreamsWithDefaults(t, reg.Connect.Proxy.Upstreams),
}
require.Equal(t, expectConfig, config)
require.Equal(t, expectConfig.ContentHash, qm.LastContentHash)

View File

@ -31,6 +31,9 @@ type CatalogService struct {
ServicePort int
ServiceWeights Weights
ServiceEnableTagOverride bool
// DEPRECATED (ProxyDestination) - remove the next comment!
// We forgot to ever add ServiceProxyDestination here so no need to deprecate!
ServiceProxy *AgentServiceConnectProxyConfig
CreateIndex uint64
ModifyIndex uint64
}

View File

@ -1,6 +1,7 @@
package api
import (
"reflect"
"testing"
"time"
@ -218,6 +219,43 @@ func TestAPI_CatalogService(t *testing.T) {
})
}
func TestAPI_CatalogServiceUnmanagedProxy(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
catalog := c.Catalog()
proxyReg := testUnmanagedProxyRegistration(t)
retry.Run(t, func(r *retry.R) {
_, err := catalog.Register(proxyReg, nil)
r.Check(err)
services, meta, err := catalog.Service("web-proxy", "", nil)
if err != nil {
r.Fatal(err)
}
if meta.LastIndex == 0 {
r.Fatalf("Bad: %v", meta)
}
if len(services) == 0 {
r.Fatalf("Bad: %v", services)
}
if services[0].Datacenter != "dc1" {
r.Fatalf("Bad datacenter: %v", services[0])
}
if !reflect.DeepEqual(services[0].ServiceProxy, proxyReg.Service.Proxy) {
r.Fatalf("bad proxy.\nwant: %v\n got: %v", proxyReg.Service.Proxy,
services[0].ServiceProxy)
}
})
}
func TestAPI_CatalogServiceCached(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -286,6 +324,64 @@ func TestAPI_CatalogService_NodeMetaFilter(t *testing.T) {
})
}
func testUpstreams(t *testing.T) []Upstream {
return []Upstream{
{
DestinationName: "db",
LocalBindPort: 9191,
Config: map[string]interface{}{
"connect_timeout_ms": float64(1000),
},
},
{
DestinationType: UpstreamDestTypePreparedQuery,
DestinationName: "geo-cache",
LocalBindPort: 8181,
},
}
}
func testExpectUpstreamsWithDefaults(t *testing.T, upstreams []Upstream) []Upstream {
ups := make([]Upstream, len(upstreams))
for i := range upstreams {
ups[i] = upstreams[i]
// Fill in default fields we expect to have back explicitly in a response
if ups[i].DestinationType == "" {
ups[i].DestinationType = UpstreamDestTypeService
}
}
return ups
}
// testUnmanagedProxy returns a fully configured external proxy service suitable
// for checking that all the config fields make it back in a response intact.
func testUnmanagedProxy(t *testing.T) *AgentService {
return &AgentService{
Kind: ServiceKindConnectProxy,
Proxy: &AgentServiceConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web1",
LocalServiceAddress: "127.0.0.2",
LocalServicePort: 8080,
Upstreams: testUpstreams(t),
},
ID: "web-proxy1",
Service: "web-proxy",
Port: 8001,
}
}
// testUnmanagedProxyRegistration returns a *CatalogRegistration for a fully
// configured external proxy.
func testUnmanagedProxyRegistration(t *testing.T) *CatalogRegistration {
return &CatalogRegistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
Service: testUnmanagedProxy(t),
}
}
func TestAPI_CatalogConnect(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -294,25 +390,27 @@ func TestAPI_CatalogConnect(t *testing.T) {
catalog := c.Catalog()
// Register service and proxy instances to test against.
proxyReg := testUnmanagedProxyRegistration(t)
proxy := proxyReg.Service
// DEPRECATED (ProxyDestination) - remove this case when the field is removed
deprecatedProxyReg := testUnmanagedProxyRegistration(t)
deprecatedProxyReg.Service.ProxyDestination = deprecatedProxyReg.Service.Proxy.DestinationServiceName
deprecatedProxyReg.Service.Proxy = nil
service := &AgentService{
ID: "redis1",
Service: "redis",
ID: proxyReg.Service.Proxy.DestinationServiceID,
Service: proxyReg.Service.Proxy.DestinationServiceName,
Port: 8000,
}
proxy := &AgentService{
Kind: ServiceKindConnectProxy,
ProxyDestination: "redis",
ID: "redis-proxy1",
Service: "redis-proxy",
Port: 8001,
}
check := &AgentCheck{
Node: "foobar",
CheckID: "service:redis1",
CheckID: "service:" + service.ID,
Name: "Redis health check",
Notes: "Script based health check",
Status: HealthPassing,
ServiceID: "redis1",
ServiceID: service.ID,
}
reg := &CatalogRegistration{
@ -322,22 +420,20 @@ func TestAPI_CatalogConnect(t *testing.T) {
Service: service,
Check: check,
}
proxyReg := &CatalogRegistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
Service: proxy,
}
retry.Run(t, func(r *retry.R) {
if _, err := catalog.Register(reg, nil); err != nil {
r.Fatal(err)
}
// First try to register deprecated proxy, shouldn't error
if _, err := catalog.Register(deprecatedProxyReg, nil); err != nil {
r.Fatal(err)
}
if _, err := catalog.Register(proxyReg, nil); err != nil {
r.Fatal(err)
}
services, meta, err := catalog.Connect("redis", "", nil)
services, meta, err := catalog.Connect(proxyReg.Service.Proxy.DestinationServiceName, "", nil)
if err != nil {
r.Fatal(err)
}
@ -357,8 +453,12 @@ func TestAPI_CatalogConnect(t *testing.T) {
if services[0].ServicePort != proxy.Port {
r.Fatalf("Returned port should be for proxy: %v", services[0])
}
})
if !reflect.DeepEqual(services[0].ServiceProxy, proxy.Proxy) {
r.Fatalf("Returned proxy config should match:\nWant: %v\n Got: %v",
proxy.Proxy, services[0].ServiceProxy)
}
})
}
func TestAPI_CatalogConnectNative(t *testing.T) {
@ -426,8 +526,19 @@ func TestAPI_CatalogNode(t *testing.T) {
defer s.Stop()
catalog := c.Catalog()
name, _ := c.Agent().NodeName()
name, err := c.Agent().NodeName()
require.NoError(t, err)
proxyReg := testUnmanagedProxyRegistration(t)
proxyReg.Node = name
proxyReg.SkipNodeUpdate = true
retry.Run(t, func(r *retry.R) {
// Register a connect proxy to ensure all it's config fields are returned
_, err := catalog.Register(proxyReg, nil)
r.Check(err)
info, meta, err := catalog.Node(name, nil)
if err != nil {
r.Fatal(err)
@ -437,17 +548,26 @@ func TestAPI_CatalogNode(t *testing.T) {
r.Fatalf("Bad: %v", meta)
}
if len(info.Services) == 0 {
r.Fatalf("Bad: %v", info)
if len(info.Services) != 2 {
r.Fatalf("Bad: %v (len %d)", info, len(info.Services))
}
if _, ok := info.Node.TaggedAddresses["wan"]; !ok {
r.Fatalf("Bad: %v", info)
r.Fatalf("Bad: %v", info.Node.TaggedAddresses)
}
if info.Node.Datacenter != "dc1" {
r.Fatalf("Bad datacenter: %v", info)
}
if _, ok := info.Services["web-proxy1"]; !ok {
r.Fatalf("Missing proxy service: %v", info.Services)
}
if !reflect.DeepEqual(proxyReg.Service.Proxy, info.Services["web-proxy1"].Proxy) {
r.Fatalf("Bad proxy config:\nwant %v\n got: %v", proxyReg.Service.Proxy,
info.Services["web-proxy"].Proxy)
}
})
}
@ -488,7 +608,9 @@ func TestAPI_CatalogRegistration(t *testing.T) {
Service: "redis-proxy",
Port: 8001,
Kind: ServiceKindConnectProxy,
ProxyDestination: service.ID,
Proxy: &AgentServiceConnectProxyConfig{
DestinationServiceName: service.Service,
},
}
proxyReg := &CatalogRegistration{
Datacenter: "dc1",

View File

@ -305,7 +305,9 @@ func TestAPI_HealthConnect(t *testing.T) {
Name: "foo-proxy",
Port: 8001,
Kind: ServiceKindConnectProxy,
ProxyDestination: "foo",
Proxy: &AgentServiceConnectProxyConfig{
DestinationServiceName: "foo",
},
}
err = agent.ServiceRegister(proxyReg)
require.NoError(t, err)

View File

@ -5,6 +5,7 @@ import (
"strconv"
"strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect/proxy"
)
@ -64,7 +65,7 @@ func (f *FlagUpstreams) Set(value string) error {
LocalBindAddress: addr,
LocalBindPort: int(port),
DestinationName: name,
DestinationType: destinationType,
DestinationType: api.UpstreamDestType(destinationType),
}
return nil

View File

@ -66,15 +66,6 @@ type cmd struct {
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.flags.StringVar(&c.cfgFile, "dev-config", "",
"If set, proxy config is read on startup from this file (in HCL or JSON"+
"format). If a config file is given, the proxy will use that instead of "+
"querying the local agent for it's configuration. It will not reload it "+
"except on startup. In this mode the proxy WILL NOT authorize incoming "+
"connections with the local agent which is totally insecure. This is "+
"ONLY for internal development and testing and will probably be removed "+
"once proxy implementation is more complete..")
c.flags.StringVar(&c.proxyID, "proxy-id", "",
"The proxy's ID on the local agent.")
@ -210,17 +201,6 @@ func (c *cmd) Run(args []string) int {
}
func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error) {
// Manual configuration file is specified.
if c.cfgFile != "" {
cfg, err := proxyImpl.ParseConfigFile(c.cfgFile)
if err != nil {
return nil, err
}
c.UI.Info("Configuration mode: File")
return proxyImpl.NewStaticConfigWatcher(cfg), nil
}
// Use the configured proxy ID
if c.proxyID != "" {
c.UI.Info("Configuration mode: Agent API")

View File

@ -197,11 +197,13 @@ func (r *RegisterMonitor) register() {
// If we're here, then we're registering the service.
err = r.Client.Agent().ServiceRegister(&api.AgentServiceRegistration{
Kind: api.ServiceKindConnectProxy,
ProxyDestination: r.Service,
ID: serviceID,
Name: serviceName,
Address: r.LocalAddress,
Port: r.LocalPort,
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: r.Service,
},
Check: &api.AgentServiceCheck{
CheckID: r.checkID(),
Name: "proxy heartbeat",

View File

@ -24,7 +24,7 @@ func TestRegisterMonitor_good(t *testing.T) {
// Verify the settings
require.Equal(api.ServiceKindConnectProxy, service.Kind)
require.Equal("foo", service.ProxyDestination)
require.Equal("foo", service.Proxy.DestinationServiceName)
require.Equal("127.0.0.1", service.Address)
require.Equal(1234, service.Port)

View File

@ -2,8 +2,8 @@ package proxy
import (
"fmt"
"io/ioutil"
"log"
"time"
"github.com/mitchellh/mapstructure"
@ -11,7 +11,6 @@ import (
"github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/hcl"
)
// Config is the publicly configurable state for an entire proxy instance. It's
@ -85,43 +84,21 @@ func (plc *PublicListenerConfig) applyDefaults() {
}
}
// UpstreamConfig configures an upstream (outgoing) listener.
type UpstreamConfig struct {
// LocalAddress is the host/ip to listen on for local app connections. Defaults to 127.0.0.1.
LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr" mapstructure:"local_bind_address"`
// UpstreamConfig is an alias for api.Upstream so we can parse in a compatible
// way but define custom methods for accessing the opaque config metadata.
type UpstreamConfig api.Upstream
LocalBindPort int `json:"local_bind_port" hcl:"local_bind_port,attr" mapstructure:"local_bind_port"`
// DestinationName is the service name of the destination.
DestinationName string `json:"destination_name" hcl:"destination_name,attr" mapstructure:"destination_name"`
// DestinationNamespace is the namespace of the destination.
DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr" mapstructure:"destination_namespace"`
// DestinationType determines which service discovery method is used to find a
// candidate instance to connect to.
DestinationType string `json:"destination_type" hcl:"destination_type,attr" mapstructure:"destination_type"`
// DestinationDatacenter is the datacenter the destination is in. If empty,
// defaults to discovery within the same datacenter.
DestinationDatacenter string `json:"destination_datacenter" hcl:"destination_datacenter,attr" mapstructure:"destination_datacenter"`
// ConnectTimeout is the timeout for establishing connections with the remote
// service instance. Defaults to 10,000 (10s).
ConnectTimeoutMs int `json:"connect_timeout_ms" hcl:"connect_timeout_ms,attr" mapstructure:"connect_timeout_ms"`
// resolver is used to plug in the service discover mechanism. It can be used
// in tests to bypass discovery. In real usage it is used to inject the
// api.Client dependency from the remainder of the config struct parsed from
// the user JSON using the UpstreamResolverFromClient helper.
resolver connect.Resolver
// ConnectTimeout returns the connect timeout field of the nested config struct
// or the default value.
func (uc *UpstreamConfig) ConnectTimeout() time.Duration {
if ms, ok := uc.Config["connect_timeout_ms"].(int); ok {
return time.Duration(ms) * time.Millisecond
}
return 10000 * time.Millisecond
}
// applyDefaults sets zero-valued params to a sane default.
func (uc *UpstreamConfig) applyDefaults() {
if uc.ConnectTimeoutMs == 0 {
uc.ConnectTimeoutMs = 10000
}
if uc.DestinationType == "" {
uc.DestinationType = "service"
}
@ -140,11 +117,11 @@ func (uc *UpstreamConfig) String() string {
uc.DestinationType, uc.DestinationNamespace, uc.DestinationName)
}
// UpstreamResolverFromClient returns a ConsulResolver that can resolve the
// given UpstreamConfig using the provided api.Client dependency.
func UpstreamResolverFromClient(client *api.Client,
cfg UpstreamConfig) connect.Resolver {
// UpstreamResolverFuncFromClient returns a closure that captures a consul
// client and when called provides aConsulResolver that can resolve the given
// UpstreamConfig using the provided api.Client dependency.
func UpstreamResolverFuncFromClient(client *api.Client) func(cfg UpstreamConfig) (connect.Resolver, error) {
return func(cfg UpstreamConfig) (connect.Resolver, error) {
// For now default to service as it has the most natural meaning and the error
// that the service doesn't exist is probably reasonable if misconfigured. We
// should probably handle actual configs that have invalid types at a higher
@ -158,7 +135,8 @@ func UpstreamResolverFromClient(client *api.Client,
Namespace: cfg.DestinationNamespace,
Name: cfg.DestinationName,
Type: typ,
Datacenter: cfg.DestinationDatacenter,
Datacenter: cfg.Datacenter,
}, nil
}
}
@ -195,28 +173,6 @@ func (sc *StaticConfigWatcher) Watch() <-chan *Config {
return sc.ch
}
// ParseConfigFile parses proxy configuration from a file for local dev.
func ParseConfigFile(filename string) (*Config, error) {
bs, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
var cfg Config
err = hcl.Unmarshal(bs, &cfg)
if err != nil {
return nil, err
}
cfg.PublicListener.applyDefaults()
for idx := range cfg.Upstreams {
cfg.Upstreams[idx].applyDefaults()
}
return &cfg, nil
}
// AgentConfigWatcher watches the local Consul agent for proxy config changes.
type AgentConfigWatcher struct {
client *api.Client
@ -282,14 +238,10 @@ func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal,
}
cfg.PublicListener.applyDefaults()
err = mapstructure.Decode(resp.Config["upstreams"], &cfg.Upstreams)
if err != nil {
w.logger.Printf("[ERR] proxy config watch upstream listener config "+
"couldn't be parsed: %s", err)
return
}
for i := range cfg.Upstreams {
cfg.Upstreams[i].applyDefaults()
for _, u := range resp.Upstreams {
uc := UpstreamConfig(u)
uc.applyDefaults()
cfg.Upstreams = append(cfg.Upstreams, uc)
}
// Parsed config OK, deliver it!

View File

@ -14,45 +14,7 @@ import (
"github.com/stretchr/testify/require"
)
func TestParseConfigFile(t *testing.T) {
t.Parallel()
cfg, err := ParseConfigFile("testdata/config-kitchensink.hcl")
require.Nil(t, err)
expect := &Config{
Token: "11111111-2222-3333-4444-555555555555",
ProxiedServiceName: "web",
ProxiedServiceNamespace: "default",
PublicListener: PublicListenerConfig{
BindAddress: "127.0.0.1",
BindPort: 9999,
LocalServiceAddress: "127.0.0.1:5000",
LocalConnectTimeoutMs: 1000,
HandshakeTimeoutMs: 10000, // From defaults
},
Upstreams: []UpstreamConfig{
{
LocalBindAddress: "127.0.0.1:6000",
DestinationName: "db",
DestinationNamespace: "default",
DestinationType: "service",
ConnectTimeoutMs: 10000,
},
{
LocalBindAddress: "127.0.0.1:6001",
DestinationName: "geo-cache",
DestinationNamespace: "default",
DestinationType: "prepared_query",
ConnectTimeoutMs: 10000,
},
},
}
require.Equal(t, expect, cfg)
}
func TestUpstreamResolverFromClient(t *testing.T) {
func TestUpstreamResolverFuncFromClient(t *testing.T) {
t.Parallel()
tests := []struct {
@ -65,7 +27,7 @@ func TestUpstreamResolverFromClient(t *testing.T) {
cfg: UpstreamConfig{
DestinationNamespace: "foo",
DestinationName: "web",
DestinationDatacenter: "ny1",
Datacenter: "ny1",
DestinationType: "service",
},
want: &connect.ConsulResolver{
@ -80,7 +42,7 @@ func TestUpstreamResolverFromClient(t *testing.T) {
cfg: UpstreamConfig{
DestinationNamespace: "foo",
DestinationName: "web",
DestinationDatacenter: "ny1",
Datacenter: "ny1",
DestinationType: "prepared_query",
},
want: &connect.ConsulResolver{
@ -95,7 +57,7 @@ func TestUpstreamResolverFromClient(t *testing.T) {
cfg: UpstreamConfig{
DestinationNamespace: "foo",
DestinationName: "web",
DestinationDatacenter: "ny1",
Datacenter: "ny1",
DestinationType: "junk",
},
want: &connect.ConsulResolver{
@ -109,7 +71,9 @@ func TestUpstreamResolverFromClient(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Client doesn't really matter as long as it's passed through.
got := UpstreamResolverFromClient(nil, tt.cfg)
gotFn := UpstreamResolverFuncFromClient(nil)
got, err := gotFn(tt.cfg)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
@ -143,11 +107,11 @@ func TestAgentConfigWatcher(t *testing.T) {
"bind_port": 1010,
"local_service_address": "127.0.0.1:5000",
"handshake_timeout_ms": 999,
"upstreams": []interface{}{
map[string]interface{}{
"destination_name": "db",
"local_bind_port": 9191,
},
Upstreams: []api.Upstream{
{
DestinationName: "db",
LocalBindPort: 9191,
},
},
},
@ -179,7 +143,6 @@ func TestAgentConfigWatcher(t *testing.T) {
DestinationType: "service",
LocalBindPort: 9191,
LocalBindAddress: "127.0.0.1",
ConnectTimeoutMs: 10000, // from applyDefaults
},
},
}
@ -190,13 +153,12 @@ func TestAgentConfigWatcher(t *testing.T) {
go func() {
// Wait for watcher to be watching
time.Sleep(20 * time.Millisecond)
upstreams := reg.Connect.Proxy.Config["upstreams"].([]interface{})
upstreams = append(upstreams, map[string]interface{}{
"destination_name": "cache",
"local_bind_port": 9292,
"local_bind_address": "127.10.10.10",
reg.Connect.Proxy.Upstreams = append(reg.Connect.Proxy.Upstreams,
api.Upstream{
DestinationName: "cache",
LocalBindPort: 9292,
LocalBindAddress: "127.10.10.10",
})
reg.Connect.Proxy.Config["upstreams"] = upstreams
reg.Connect.Proxy.Config["local_connect_timeout_ms"] = 444
err := agent.ServiceRegister(reg)
require.NoError(t, err)
@ -208,7 +170,6 @@ func TestAgentConfigWatcher(t *testing.T) {
DestinationName: "cache",
DestinationNamespace: "default",
DestinationType: "service",
ConnectTimeoutMs: 10000, // from applyDefaults
LocalBindPort: 9292,
LocalBindAddress: "127.10.10.10",
})

View File

@ -12,6 +12,7 @@ import (
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
)
@ -27,7 +28,7 @@ type Listener struct {
// Service is the connect service instance to use.
Service *connect.Service
// listenFunc, dialFunc and bindAddr are set by type-specific constructors
// listenFunc, dialFunc, and bindAddr are set by type-specific constructors.
listenFunc func() (net.Listener, error)
dialFunc func() (net.Conn, error)
bindAddr string
@ -86,7 +87,14 @@ func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig,
// NewUpstreamListener returns a Listener setup to listen locally for TCP
// connections that are proxied to a discovered Connect service instance.
func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
func NewUpstreamListener(svc *connect.Service, client *api.Client,
cfg UpstreamConfig, logger *log.Logger) *Listener {
return newUpstreamListenerWithResolver(svc, cfg,
UpstreamResolverFuncFromClient(client), logger)
}
func newUpstreamListenerWithResolver(svc *connect.Service, cfg UpstreamConfig,
resolverFunc func(UpstreamConfig) (connect.Resolver, error),
logger *log.Logger) *Listener {
bindAddr := fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort)
return &Listener{
@ -95,13 +103,14 @@ func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
return net.Listen("tcp", bindAddr)
},
dialFunc: func() (net.Conn, error) {
if cfg.resolver == nil {
return nil, errors.New("no resolver provided")
rf, err := resolverFunc(cfg)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(),
time.Duration(cfg.ConnectTimeoutMs)*time.Millisecond)
cfg.ConnectTimeout())
defer cancel()
return svc.Dial(ctx, cfg.resolver)
return svc.Dial(ctx, rf)
},
bindAddr: bindAddr,
stopChan: make(chan struct{}),
@ -111,7 +120,7 @@ func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
metricLabels: []metrics.Label{
{Name: "src", Value: svc.Name()},
// TODO(banks): namespace support
{Name: "dst_type", Value: cfg.DestinationType},
{Name: "dst_type", Value: string(cfg.DestinationType)},
{Name: "dst", Value: cfg.DestinationName},
},
}

View File

@ -176,13 +176,9 @@ func TestUpstreamListener(t *testing.T) {
DestinationType: "service",
DestinationNamespace: "default",
DestinationName: "db",
ConnectTimeoutMs: 100,
Config: map[string]interface{}{"connect_timeout_ms": 100},
LocalBindAddress: "localhost",
LocalBindPort: ports[0],
resolver: &connect.StaticResolver{
Addr: testSvr.Addr,
CertURI: agConnect.TestSpiffeIDService(t, "db"),
},
}
// Setup metrics to test they are recorded
@ -190,7 +186,12 @@ func TestUpstreamListener(t *testing.T) {
svc := connect.TestService(t, "web", ca)
l := NewUpstreamListener(svc, cfg, log.New(os.Stderr, "", log.LstdFlags))
// Setup with a statuc resolver instead
rf := TestStaticUpstreamResolverFunc(&connect.StaticResolver{
Addr: testSvr.Addr,
CertURI: agConnect.TestSpiffeIDService(t, "db"),
})
l := newUpstreamListenerWithResolver(svc, cfg, rf, log.New(os.Stderr, "", log.LstdFlags))
// Run proxy
go func() {

View File

@ -96,7 +96,6 @@ func (p *Proxy) Serve() error {
// start one of each and stop/modify if changes occur.
for _, uc := range newCfg.Upstreams {
uc.applyDefaults()
uc.resolver = UpstreamResolverFromClient(p.client, uc)
if uc.LocalBindPort < 1 {
p.logger.Printf("[ERR] upstream %s has no local_bind_port. "+
@ -104,7 +103,7 @@ func (p *Proxy) Serve() error {
continue
}
l := NewUpstreamListener(p.service, uc, p.logger)
l := NewUpstreamListener(p.service, p.client, uc, p.logger)
err := p.startListener(uc.String(), l)
if err != nil {
p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(),

View File

@ -1,27 +0,0 @@
# Example proxy config with everything specified
token = "11111111-2222-3333-4444-555555555555"
proxied_service_name = "web"
proxied_service_namespace = "default"
public_listener {
bind_address = "127.0.0.1"
bind_port= "9999"
local_service_address = "127.0.0.1:5000"
}
upstreams = [
{
local_bind_address = "127.0.0.1:6000"
destination_name = "db"
destination_namespace = "default"
destination_type = "service"
},
{
local_bind_address = "127.0.0.1:6001"
destination_name = "geo-cache"
destination_namespace = "default"
destination_type = "prepared_query"
}
]

View File

@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/lib/freeport"
"github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
@ -110,3 +111,11 @@ func TestEchoConn(t testing.T, conn net.Conn, prefix string) {
// see PR #4498
time.Sleep(time.Millisecond)
}
// TestStaticUpstreamResolverFunc returns a function that will return a static
// resolver for testing UpstreamListener.
func TestStaticUpstreamResolverFunc(r connect.Resolver) func(UpstreamConfig) (connect.Resolver, error) {
return func(UpstreamConfig) (connect.Resolver, error) {
return r, nil
}
}

View File

@ -170,7 +170,7 @@ func (cr *ConsulResolver) resolveServiceEntry(entry *api.ServiceEntry) (string,
}
port := entry.Service.Port
service := entry.Service.ProxyDestination
service := entry.Service.Proxy.DestinationServiceName
if entry.Service.Connect != nil && entry.Service.Connect.Native {
service = entry.Service.Service
}

View File

@ -62,7 +62,9 @@ func TestConsulResolver_Resolve(t *testing.T) {
Kind: "connect-proxy",
Name: "web-proxy",
Port: 9090,
ProxyDestination: "web",
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: "web",
},
}
err = client.Agent().ServiceRegister(regProxy)
require.Nil(t, err)

View File

@ -275,7 +275,22 @@ $ curl \
"bind_address": "127.0.0.1",
"bind_port": 20199,
"local_service_address": "127.0.0.1:8181"
},
"Upstreams": [
{
"DestinationType": "service",
"DestinationName": "db",
"LocalBindPort": 1234,
"Config": {
"connect_timeout_ms": 1000
}
},
{
"DestinationType": "prepared_query",
"DestinationName": "geo-cache",
"LocalBindPort": 1235
}
]
}
```
@ -295,3 +310,7 @@ $ curl \
- `Config` `(map<string|any>)` - The configuration for the managed proxy. This
is a map of primitive values (including arrays and maps) that is set by the
user.
- `Upstreams` `(array<Upstream>)` - The configured upstreams for the proxy. See
[Upstream Configuration Reference](/docs/connect/proxies.html#upstream-configuration-reference)
for more details on the format.

View File

@ -53,11 +53,12 @@ $ curl \
"ID": "redis",
"Service": "redis",
"Tags": [],
"Address": "",
"Meta": {
"redis_version": "4.0"
},
"Port": 8000
"Port": 8000,
"Address": "",
"EnableTagOverride": false
}
}
```
@ -72,7 +73,7 @@ sending updates about its local services to the servers to keep the global
catalog in sync.
For "connect-proxy" kind services, the `service:write` ACL for the
`ProxyDestination` value is also required to register the service.
`Proxy.DestinationServiceName` value is also required to register the service.
| Method | Path | Produces |
| ------ | ---------------------------- | -------------------------- |
@ -90,6 +91,9 @@ The table below shows this endpoint's support for
### Parameters
Note that this endpoint, unlike most also [supports `snake_case`](/docs/agent/services.html#service-definition-parameter-case)
service definition keys for compatibility with the config file format.
- `Name` `(string: <required>)` - Specifies the logical name of the service.
Many service instances may share the same logical service name.
@ -114,14 +118,20 @@ The table below shows this endpoint's support for
services that are [Connect-capable](/docs/connect/index.html)
proxies representing another service.
- `ProxyDestination` `(string: "")` - For "connect-proxy" `Kind` services,
this must be set to the name of the service that the proxy represents. This
service doesn't need to be registered, but the caller must have an ACL token
with permissions for this service.
- `ProxyDestination` `(string: "")` - **Deprecated** From 1.2.0 to 1.2.3 this
was used for "connect-proxy" `Kind` services however the equivalent field is
now in `Proxy.DestinationServiceName`. Registrations using this field will
continue to work until some later major version where this will be removed
entirely. It's strongly recommended to switch to using the new field.
- `Connect` `(Connect: nil)` - Specifies the configuration for
[Connect](/docs/connect/index.html). See the [Connect structure](#connect-structure)
section for supported fields.
- `Proxy` `(Proxy: nil)` - From 1.2.3 on, specifies the configuration for a
Connect proxy instance. This is only valid if `Kind == "connect-proxy"`. See
the [Unmanaged Proxy](/docs/connect/proxies.html#unmanaged-proxies)
documentation for full details.
- `Connect` `(Connect: nil)` - Specifies the
[configuration for Connect](/docs/connect/configuration.html). See the
[Connect Structure](#connect-structure) section below for supported fields.
- `Check` `(Check: nil)` - Specifies a check. Please see the
[check documentation](/api/agent/check.html) for more information about the
@ -168,6 +178,10 @@ For the `Connect` field, the parameters are:
the [Connect](/docs/connect/index.html) protocol [natively](/docs/connect/native.html).
If this is true, then Connect proxies, DNS queries, etc. will be able to
service discover this service.
- `Proxy` `(Proxy: nil)` - Specifies that a managed Connect proxy should be
started for this service instance, and optionally provides configuration for
the proxy. The format is as documented in
[Managed Proxies](/docs/connect/proxies.html#managed-proxies) .
### Sample Payload

View File

@ -446,7 +446,20 @@ $ curl \
},
"ServiceTags": [
"tacos"
]
],
"ServiceProxyDestination": "",
"ServiceProxy": {
"DestinationServiceName": "",
"DestinationServiceID": "",
"LocalServiceAddress": "",
"LocalServicePort": 0,
"Config": null,
"Upstreams": null
},
"ServiceConnect": {
"Native": false,
"Proxy": null
},
}
]
```
@ -488,11 +501,16 @@ $ curl \
- `ServiceKind` is the kind of service, usually "". See the Agent
registration API for more information.
- `ServiceProxyDestination` is the name of the service that is being proxied,
for "connect-proxy" type services.
- `ServiceProxyDestination` **Deprecated** this field duplicates
`ServiceProxy.DestinationServiceName` for backwards compatibility. It will be
removed in a future major version release.
- `ServiceProxy` is the proxy config as specified in
[Unmanaged Proxies](/docs/connect/proxies.html#unmanaged-proxies).
- `ServiceConnect` are the [Connect](/docs/connect/index.html) settings. The
value of this struct is equivalent to the `Connect` field for service registration.
value of this struct is equivalent to the `Connect` field for service
registration.
## List Nodes for Connect-capable Service

View File

@ -17,10 +17,10 @@ or added at runtime over the HTTP interface.
## Service Definition
To configure a service, either provide the service definition as a `-config-file` option to
the agent or place it inside the `-config-dir` of the agent. The file
must end in the `.json` or `.hcl` extension to be loaded by Consul. Check
definitions can be updated by sending a `SIGHUP` to the agent.
To configure a service, either provide the service definition as a
`-config-file` option to the agent or place it inside the `-config-dir` of the
agent. The file must end in the `.json` or `.hcl` extension to be loaded by
Consul. Check definitions can be updated by sending a `SIGHUP` to the agent.
Alternatively, the service can be registered dynamically using the [HTTP
API](/api/index.html).
@ -46,6 +46,14 @@ example shows all possible fields, but note that only a few are required.
],
"kind": "connect-proxy",
"proxy_destination": "redis",
"proxy": {
"destination_service_name": "redis",
"destination_service_id": "redis1",
"local_service_name": "127.0.0.1",
"local_service_port": 9090,
"config": {},
"upstreams": []
}
"connect": {
"native": false,
"proxy": {
@ -88,9 +96,13 @@ and all the instances of a given service have their own copy of it.
The `kind` field is used to optionally identify the service as an [unmanaged
Connect proxy](/docs/connect/proxies.html#unmanaged-proxies) instance with the
value `connect-proxy`. For typical non-proxy instances the `kind` field must be
omitted. The `proxy_destination` field is also required for unmanaged proxy
registrations and is only valid if `kind` is `connect-proxy`. It's value must be
the _name_ of the service that the proxy is handling traffic for.
omitted. The `proxy` field is also required for unmanaged proxy registrations
and is only valid if `kind` is `connect-proxy`. The only required `proxy` field
is `destination_service_name`. From version 1.2.0 to 1.3.0 this was specified
using `proxy_destination` which still works but is now deprecated. See the
[unmanaged proxy
configuration](/docs/connect/proxies.html#complete-configuration-example)
documentation for full details.
The `connect` field can be specified to configure
[Connect](/docs/connect/index.html) for a service. This field is available in
@ -220,3 +232,11 @@ recommended to always use DNS-compliant service and tag names.
DNS-compliant service and tag names may contain any alpha-numeric characters, as
well as dashes. Dots are not supported because Consul internally uses them to
delimit service tags.
## Service Definition Parameter Case
For historical reasons Consul's API uses `CamelCased` parameter names in
responses, however it's configuration file syntax borrowed from HCL uses
`snake_case`. For this reason the registration APIs accept both cases for
service definition parameters although APIs will return the listings using
`CamelCase`.

View File

@ -48,15 +48,15 @@ needed for a secure deployment.
## Built-In Proxy Options
This is a complete example of all the configuration options available for the
built-in proxy. Note that only the `service.connect.proxy.config` map is being
described here, the rest of the service definition is shown for context and is
[described elsewhere](/docs/connect/proxies.html#managed-proxies).
built-in proxy. Note that only the `service.connect.proxy.config` and
`service.connect.proxy.upsteams[].config` maps are being described here, the
rest of the service definition is shown for context but is [described
elsewhere](/docs/connect/proxies.html#managed-proxies).
```javascript
{
"service": {
"name": "web",
"port": 8080,
...
"connect": {
"proxy": {
"config": {
@ -67,24 +67,23 @@ described here, the rest of the service definition is shown for context and is
"local_service_address": "127.0.0.1:1234",
"local_connect_timeout_ms": 1000,
"handshake_timeout_ms": 10000,
"upstreams": [...]
},
"upstreams": [
{
"destination_type": "service",
"destination_name": "redis",
"destination_datacenter": "dc1",
"local_bind_address": "127.0.0.1",
"local_bind_port": 1234,
"connect_timeout_ms": 10000
},
]
...
"config": {
"connect_timeout_ms": 1000
}
}
]
}
}
}
}
```
#### Configuration Key Reference
#### Proxy Config Key Reference
All fields are optional with a sane default.
@ -131,25 +130,19 @@ All fields are optional with a sane default.
number of milliseconds the proxy will wait for _incoming_ mTLS connections to
complete the TLS handshake. Defaults to `10000` or 10 seconds.
* <a name="upstreams"></a><a href="#upstreams">`upstreams`</a> - An array of
upstream definitions for remote services that the proxied
application needs to make outgoing connections to. Each definition has the
following fields:
* <a name="destination_name"></a><a href="#destination_name">`destination_name`</a> -
[required] The name of the service or prepared query to route connect to.
* <a name="local_bind_port"></a><a href="#local_bind_port">`local_bind_port`</a> -
[required] The port to bind a local listener to for the application to
make outbound connections to this upstream.
* <a name="local_bind_address"></a><a href="#local_bind_address">`local_bind_address`</a> -
The address to bind a local listener to for the application to make
outbound connections to this upstream.
* <a name="destination_type"></a><a href="#destination_type">`destination_type`</a> -
Either `service` or `upstream`. The type of discovery query to use to find
an instance to connect to. Defaults to `service`.
* <a name="destination_datacenter"></a><a href="#destination_datacenter">`destination_datacenter`</a> -
The datacenter to issue the discovery query too. Defaults to the local datacenter.
* <a name="connect_timeout_ms"></a><a href="#connect_timeout_ms">`connect_timeout_ms`</a> -
The number of milliseconds the proxy will wait to establish a connection to
and complete TLS handshake with the _remote_ application or proxy. Defaults
to `10000` or 10 seconds.
* <a name="upstreams"></a><a href="#upstreams">`upstreams`</a> - **Deprecated**
Upstreams are now specified in the `connect.proxy` definition. Upstreams
specified in the opaque config map here will continue to work for
compatibility but it's strongly recommended that you move to using the higher
level [upstream
configuration](http://localhost:4567/docs/connect/proxies.html#upstream-configuration).
#### Proxy Upstream Config Key Reference
All fields are optional with a sane default.
* <a name="connect_timeout_ms"></a><a
href="#connect_timeout_ms">`connect_timeout_ms`</a> - The number of
milliseconds the proxy will wait to establish a TLS connection to the
discovered upstream instance before giving up. Defaults to `10000` or 10
seconds.

View File

@ -111,7 +111,6 @@ proxy configuration:
"port": 8080,
"connect": {
"proxy": {
"config": {
"upstreams": [{
"destination_name": "redis",
"local_bind_port": 1234
@ -120,7 +119,6 @@ proxy configuration:
}
}
}
}
```
In the example above,
@ -134,8 +132,17 @@ static port will be able to masquerade as the source service ("web" in the
example above). You must either trust any loopback access on that port or
use namespacing techniques provided by your operating system.
For full details of the configurable options available see the [built-in proxy
configuration reference](/docs/connect/configuration.html#built-in-proxy-options).
-> **Deprecation Note:** versions 1.2.0 to 1.3.0 required specifying `upstreams`
as part of the opaque `config` that is passed to the proxy. However, since
1.3.0, the `upstreams` configuration is now specified directily under the
`proxy` key. Old service definitions using the nested config will continue to
work and have the values copied into the new location. This allows the upstreams
to be registered centrally rather than being part of the local-only config
passed to the proxy instance.
For full details of the additional configurable options available when using the
built-in proxy see the [built-in proxy configuration
reference](/docs/connect/configuration.html#built-in-proxy-options).
### Prepared Query Upstreams
@ -157,7 +164,6 @@ service.
"port": 8080,
"connect": {
"proxy": {
"config": {
"upstreams": [{
"destination_name": "redis",
"destination_type": "prepared_query",
@ -167,7 +173,6 @@ service.
}
}
}
}
```
-> **Note:** Connect does not currently support cross-datacenter
@ -176,8 +181,9 @@ only be used to discover services within a single datacenter. See
[Multi-Datacenter Connect](/docs/connect/index.html#multi-datacenter) for
more information.
For full details of the configurable options available see the [built-in proxy
configuration reference](/docs/connect/configuration.html#built-in-proxy-options).
For full details of the additional configurable options available when using the
built-in proxy see the [built-in proxy configuration
reference](/docs/connect/configuration.html#built-in-proxy-options).
### Dynamic Upstreams
@ -187,6 +193,54 @@ with Connect. After natively integrating, the HTTP API or
[DNS interface](/docs/agent/dns.html#connect-capable-service-lookups)
can be used.
### Upstream Configuration Reference
The following example shows all possible upstream configuration parameters.
Note that in versions 1.2.0 to 1.3.0, managed proxy upstreams were specified
inside the opaque `connect.proxy.config` map. The format is almost unchanged
however managed proxy upstreams are now defined a level up in the
`connect.proxy.upstreams`. The old location is deprecated and will be
automatically converted into the new for an interim period before support is
dropped in a future major release. The only difference in format between the
upstream defintions is that the field `destination_datacenter` has been renamed
to `datacenter` to reflect that it's the discovery target and not necessarily
the same as the instance that will be returned in the case of a prepared query
that fails over to another datacenter.
Note that `snake_case` is used here as it works in both [config file and API
registrations](/docs/agent/services.html#service-definition-parameter-case).
```json
{
"destination_type": "service",
"destination_name": "redis",
"datacenter": "dc1",
"local_bind_address": "127.0.0.1",
"local_bind_port": 1234,
"config": {}
},
```
* `destination_name` `string: <required>` - Specifies the name of the service or
prepared query to route connect to.
* `local_bind_port` `int: <required>` - Specifies the port to bind a local
listener to for the application to make outbound connections to this upstream.
* `local_bind_address` `string: <optional>` - Specifies the address to bind a
local listener to for the application to make outbound connections to this
upstream. Defaults to `127.0.0.1`.
* `destination_type` `string: <optional>` - Speficied the type of discovery
query to use to find an instance to connect to. Valid values are `service` or
`prepared_query`. Defaults to `service`.
* `datacenter` `string: <optional>` - Specifies the datacenter to issue the
discovery query too. Defaults to the local datacenter.
* `config` `object: <optional>` - Specifies opaque configuration options that
will be provided to the proxy instance for this specific upstream. Can contain
any valid JSON object. This might be used to configure proxy-specific features
like timeouts or retries for the given upstream. See the
[built-in proxy configuration reference](/docs/connect/configuration.html#built-in-proxy-options)
for options available when using the built-in proxy.
### Custom Managed Proxy
[Custom proxies](/docs/connect/proxies/integrate.html) can also be
@ -201,7 +255,10 @@ an alternate command to execute for the proxy:
"connect": {
"proxy": {
"exec_mode": "daemon",
"command": ["/usr/bin/my-proxy", "-flag-example"]
"command": ["/usr/bin/my-proxy", "-flag-example"],
"config": {
"foo": "bar"
}
}
}
}
@ -230,6 +287,12 @@ connect {
With this configuration, all services registered without an explicit
proxy command will use `my-proxy` instead of the default built-in proxy.
The `config` key is an optional opaque JSON object which will be passed through
to the proxy via the proxy configuration endpoint to allow any configuration
options the proxy needs to be specified. See the [built-in proxy
configuration reference](/docs/connect/configuration.html#built-in-proxy-options)
for details of config options that can be passed when using the built-in proxy.
### Managed Proxy Logs
Managed proxies have both stdout and stderr captured in log files in the agent's
@ -254,28 +317,84 @@ be started, configured, and stopped manually by some external process such
as an operator or scheduler.
To declare a service as a proxy, the service definition must contain
at least two additional fields:
the following fields:
* `Kind` (string) must be set to `connect-proxy`. This declares that the
* `kind` (string) must be set to `connect-proxy`. This declares that the
service is a proxy type.
* `ProxyDestination` (string) must be set to the service that this proxy
is representing.
* `proxy.destination_service_name` (string) must be set to the service that this
proxy is representing. Note that this replaces `proxy_destination` in
versions 1.2.0 to 1.3.0.
* `Port` must be set so that other Connect services can discover the exact
address for connections. `Address` is optional if the service is being
* `port` must be set so that other Connect services can discover the exact
address for connections. `address` is optional if the service is being
registered against an agent, since it'll inherit the node address.
Example:
Minimal Example:
```json
{
"Name": "redis-proxy",
"Kind": "connect-proxy",
"ProxyDestination": "redis",
"Port": 8181
"name": "redis-proxy",
"kind": "connect-proxy",
"proxy": {
"destination_service_name": "redis"
},
"port": 8181
}
```
With this service registered, any Connect proxies searching for a
Connect-capable endpoint for "redis" will find this proxy.
### Complete Configuration Example
The following is a complete example showing all the options available when
registering an unmanaged proxy instance.
```json
{
"name": "redis-proxy",
"kind": "connect-proxy",
"proxy": {
"destination_service_name": "redis",
"destination_service_id": "redis1",
"local_service_address": "127.0.0.1",
"local_service_port": 9090,
"config": {},
"upstreams": []
},
"port": 8181
}
```
#### Proxy Parameters
- `destination_service_name` `string: <required>` - Specifies the _name_ of the
service this instance is proxying. Both side-car and centralized
load-balancing proxies must specify this. It is used during service
discovery to find the correct proxy instances to route to for a given service
name.
- `destination_service_id` `string: <optional>` - Specifies the _ID_ of a single
specific service instance that this proxy is representing. This is only valid
for side-car style proxies that run on the same node. It is assumed that the
service instance is registered via the same Consul agent so the ID is unique
and has no node qualifier. This is useful to show in tooling which proxy
instance is a side-car for which application instance and will enable
fine-grained analysis of the metrics coming from the proxy.
- `local_service_address` `string: <optional>` - Specifies the address a side-car
proxy should attempt to connect to the local application instance on.
Defaults to 127.0.0.1.
- `local_service_port` `int: <optional>` - Specifies the port a side-car
proxy should attempt to connect to the local application instance on.
Defaults to the port advertised by the service instance identified by
`destination_service_id` if it exists otherwise it may be empty in responses.
- `config` `object: <optional>` - Specifies opaque config JSON that will be
stored and returned along with the service instance from future API calls.
- `upstreams` `array<Upstream>: <optional>` - Specifies the upstream services
this proxy should create listeners for. The format is defined in
[Upstream Configuration Reference](#upstream-configuration-reference).