mirror of https://github.com/status-im/consul.git
Add support for merge-central-config query param (#13001)
Adds a new query param merge-central-config for use with the below endpoints: /catalog/service/:service /catalog/connect/:service /health/service/:service /health/connect/:service If set on the request, the response will include a fully resolved service definition which is merged with the proxy-defaults/global and service-defaults/:service config entries (on-demand style). This is useful to view the full service definition for a mesh service (connect-proxy kind or gateway kind) which might not be merged before being written into the catalog (example: in case of services in the agentless model).
This commit is contained in:
parent
31526139fd
commit
d8d8c8603e
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:enhancement
|
||||||
|
api: `merge-central-config` query parameter support added to some catalog and health endpoints to view a fully resolved service definition (especially when not written into the catalog that way).
|
||||||
|
```
|
|
@ -356,6 +356,10 @@ func (s *HTTPHandlers) catalogServiceNodes(resp http.ResponseWriter, req *http.R
|
||||||
args.TagFilter = true
|
args.TagFilter = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := params["merge-central-config"]; ok {
|
||||||
|
args.MergeCentralConfig = true
|
||||||
|
}
|
||||||
|
|
||||||
// Pull out the service name
|
// Pull out the service name
|
||||||
var err error
|
var err error
|
||||||
args.ServiceName, err = getPathSuffixUnescaped(req.URL.Path, pathPrefix)
|
args.ServiceName, err = getPathSuffixUnescaped(req.URL.Path, pathPrefix)
|
||||||
|
|
|
@ -1052,6 +1052,236 @@ func TestCatalogServiceNodes_ConnectProxy(t *testing.T) {
|
||||||
assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
|
assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs.ProxyConfigEntry) {
|
||||||
|
t.Helper()
|
||||||
|
// Register proxy-defaults
|
||||||
|
proxyGlobalEntry = structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: structs.ProxyConfigGlobal,
|
||||||
|
Mode: structs.ProxyModeDirect,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"local_connect_timeout_ms": uint64(1000),
|
||||||
|
"handshake_timeout_ms": uint64(1000),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
proxyDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
|
||||||
|
Op: structs.ConfigEntryUpsert,
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Entry: &proxyGlobalEntry,
|
||||||
|
}
|
||||||
|
var proxyDefaultsConfigEntryResp bool
|
||||||
|
assert.Nil(t, a.RPC("ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (serviceDefaultsConfigEntry structs.ServiceConfigEntry) {
|
||||||
|
t.Helper()
|
||||||
|
limits := 512
|
||||||
|
serviceDefaultsConfigEntry = structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: serviceName,
|
||||||
|
Mode: structs.ProxyModeTransparent,
|
||||||
|
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||||
|
Defaults: &structs.UpstreamConfig{
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeLocal,
|
||||||
|
},
|
||||||
|
Limits: &structs.UpstreamLimits{
|
||||||
|
MaxConnections: &limits,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
serviceDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
|
||||||
|
Op: structs.ConfigEntryUpsert,
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Entry: &serviceDefaultsConfigEntry,
|
||||||
|
}
|
||||||
|
var serviceDefaultsConfigEntryResp bool
|
||||||
|
assert.Nil(t, a.RPC("ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateMergeCentralConfigResponse(t *testing.T, v *structs.ServiceNode,
|
||||||
|
registerServiceReq *structs.RegisterRequest,
|
||||||
|
proxyGlobalEntry structs.ProxyConfigEntry,
|
||||||
|
serviceDefaultsConfigEntry structs.ServiceConfigEntry) {
|
||||||
|
|
||||||
|
t.Helper()
|
||||||
|
assert.Equal(t, registerServiceReq.Service.Service, v.ServiceName)
|
||||||
|
// validate proxy global defaults are resolved in the merged service config
|
||||||
|
assert.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config)
|
||||||
|
// validate service defaults override proxy-defaults/global
|
||||||
|
assert.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode)
|
||||||
|
assert.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode)
|
||||||
|
// validate service defaults are resolved in the merged service config
|
||||||
|
// expected number of upstreams = (number of upstreams defined in the register request proxy config +
|
||||||
|
// 1 default from service defaults)
|
||||||
|
assert.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams))
|
||||||
|
for _, up := range v.ServiceProxy.Upstreams {
|
||||||
|
if up.DestinationType != "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
assert.Contains(t, up.Config, "limits")
|
||||||
|
upstreamLimits := up.Config["limits"].(*structs.UpstreamLimits)
|
||||||
|
assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections)
|
||||||
|
assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListServiceNodes_MergeCentralConfig(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register the service
|
||||||
|
registerServiceReq := structs.TestRegisterRequestProxy(t)
|
||||||
|
var out struct{}
|
||||||
|
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
|
||||||
|
|
||||||
|
// Register proxy-defaults
|
||||||
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||||
|
|
||||||
|
// Register service-defaults
|
||||||
|
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||||
|
|
||||||
|
type testCase struct {
|
||||||
|
testCaseName string
|
||||||
|
serviceName string
|
||||||
|
connect bool
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config", tc.serviceName)
|
||||||
|
if tc.connect {
|
||||||
|
url = fmt.Sprintf("/v1/catalog/connect/%s?merge-central-config", tc.serviceName)
|
||||||
|
}
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
var obj interface{}
|
||||||
|
var err error
|
||||||
|
if tc.connect {
|
||||||
|
obj, err = a.srv.CatalogConnectServiceNodes(resp, req)
|
||||||
|
} else {
|
||||||
|
obj, err = a.srv.CatalogServiceNodes(resp, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
serviceNodes := obj.(structs.ServiceNodes)
|
||||||
|
|
||||||
|
// validate response
|
||||||
|
assert.Len(t, serviceNodes, 1)
|
||||||
|
v := serviceNodes[0]
|
||||||
|
|
||||||
|
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||||
|
}
|
||||||
|
testCases := []testCase{
|
||||||
|
{
|
||||||
|
testCaseName: "List service instances with merge-central-config",
|
||||||
|
serviceName: registerServiceReq.Service.Service,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testCaseName: "List connect capable service instances with merge-central-config",
|
||||||
|
serviceName: registerServiceReq.Service.Proxy.DestinationServiceName,
|
||||||
|
connect: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.testCaseName, func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCatalogServiceNodes_MergeCentralConfigBlocking(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register the service
|
||||||
|
registerServiceReq := structs.TestRegisterRequestProxy(t)
|
||||||
|
var out struct{}
|
||||||
|
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
|
||||||
|
|
||||||
|
// Register proxy-defaults
|
||||||
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||||
|
|
||||||
|
// Run the query
|
||||||
|
rpcReq := structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: registerServiceReq.Service.Service,
|
||||||
|
MergeCentralConfig: true,
|
||||||
|
}
|
||||||
|
var rpcResp structs.IndexedServiceNodes
|
||||||
|
assert.Nil(t, a.RPC("Catalog.ServiceNodes", &rpcReq, &rpcResp))
|
||||||
|
|
||||||
|
assert.Len(t, rpcResp.ServiceNodes, 1)
|
||||||
|
serviceNode := rpcResp.ServiceNodes[0]
|
||||||
|
assert.Equal(t, registerServiceReq.Service.Service, serviceNode.ServiceName)
|
||||||
|
// validate proxy global defaults are resolved in the merged service config
|
||||||
|
assert.Equal(t, proxyGlobalEntry.Config, serviceNode.ServiceProxy.Config)
|
||||||
|
assert.Equal(t, proxyGlobalEntry.Mode, serviceNode.ServiceProxy.Mode)
|
||||||
|
|
||||||
|
// Async cause a change - register service defaults
|
||||||
|
waitIndex := rpcResp.Index
|
||||||
|
start := time.Now()
|
||||||
|
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
|
||||||
|
go func() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
// Register service-defaults
|
||||||
|
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||||
|
}()
|
||||||
|
|
||||||
|
const waitDuration = 3 * time.Second
|
||||||
|
RUN_BLOCKING_QUERY:
|
||||||
|
url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config&wait=%s&index=%d",
|
||||||
|
registerServiceReq.Service.Service, waitDuration.String(), waitIndex)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
idx := getIndex(t, resp)
|
||||||
|
if idx < waitIndex {
|
||||||
|
t.Fatalf("bad index returned: %v", idx)
|
||||||
|
} else if idx == waitIndex {
|
||||||
|
if elapsed > waitDuration {
|
||||||
|
// This should prevent the loop from running longer than the waitDuration
|
||||||
|
t.Fatalf("too slow: %v", elapsed)
|
||||||
|
}
|
||||||
|
goto RUN_BLOCKING_QUERY
|
||||||
|
}
|
||||||
|
// Should block at least 100ms before getting the changed results
|
||||||
|
if elapsed < 100*time.Millisecond {
|
||||||
|
t.Fatalf("too fast: %v", elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceNodes := obj.(structs.ServiceNodes)
|
||||||
|
|
||||||
|
// validate response
|
||||||
|
assert.Len(t, serviceNodes, 1)
|
||||||
|
v := serviceNodes[0]
|
||||||
|
|
||||||
|
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||||
|
}
|
||||||
|
|
||||||
// Test that the Connect-compatible endpoints can be queried for a
|
// Test that the Connect-compatible endpoints can be queried for a
|
||||||
// service via /v1/catalog/connect/:service.
|
// service via /v1/catalog/connect/:service.
|
||||||
func TestCatalogConnectServiceNodes_good(t *testing.T) {
|
func TestCatalogConnectServiceNodes_good(t *testing.T) {
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
|
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
@ -663,6 +664,11 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
priorMergeHash uint64
|
||||||
|
ranMergeOnce bool
|
||||||
|
)
|
||||||
|
|
||||||
err = c.srv.blockingQuery(
|
err = c.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
|
@ -672,10 +678,53 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.ServiceNodes = index, services
|
mergedServices := services
|
||||||
|
|
||||||
|
if args.MergeCentralConfig {
|
||||||
|
var mergedServiceNodes structs.ServiceNodes
|
||||||
|
for _, sn := range services {
|
||||||
|
mergedsn := sn
|
||||||
|
ns := sn.ToNodeService()
|
||||||
|
if ns.IsSidecarProxy() || ns.IsGateway() {
|
||||||
|
cfgIndex, mergedns, err := mergeNodeServiceWithCentralConfig(ws, state, args, ns, c.logger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if cfgIndex > index {
|
||||||
|
index = cfgIndex
|
||||||
|
}
|
||||||
|
mergedsn = mergedns.ToServiceNode(sn.Node)
|
||||||
|
}
|
||||||
|
mergedServiceNodes = append(mergedServiceNodes, mergedsn)
|
||||||
|
}
|
||||||
|
if len(mergedServiceNodes) > 0 {
|
||||||
|
mergedServices = mergedServiceNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate a hash of the mergedServices driving this response.
|
||||||
|
// Use it to determine if the response is identical to a prior wakeup.
|
||||||
|
newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||||
|
}
|
||||||
|
if ranMergeOnce && priorMergeHash == newMergeHash {
|
||||||
|
// the below assignment is not required as the if condition already validates equality,
|
||||||
|
// but makes it more clear that prior value is being reset to the new hash on each run.
|
||||||
|
priorMergeHash = newMergeHash
|
||||||
|
reply.Index = index
|
||||||
|
// NOTE: the prior response is still alive inside of *reply, which is desirable
|
||||||
|
return errNotChanged
|
||||||
|
} else {
|
||||||
|
priorMergeHash = newMergeHash
|
||||||
|
ranMergeOnce = true
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Index, reply.ServiceNodes = index, mergedServices
|
||||||
if len(args.NodeMetaFilters) > 0 {
|
if len(args.NodeMetaFilters) > 0 {
|
||||||
var filtered structs.ServiceNodes
|
var filtered structs.ServiceNodes
|
||||||
for _, service := range services {
|
for _, service := range mergedServices {
|
||||||
if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) {
|
if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) {
|
||||||
filtered = append(filtered, service)
|
filtered = append(filtered, service)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,11 +9,9 @@ import (
|
||||||
"github.com/armon/go-metrics/prometheus"
|
"github.com/armon/go-metrics/prometheus"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
"github.com/mitchellh/copystructure"
|
|
||||||
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/configentry"
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
@ -482,7 +480,6 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch all relevant config entries.
|
// Fetch all relevant config entries.
|
||||||
|
|
||||||
index, entries, err := state.ReadResolvedServiceConfigEntries(
|
index, entries, err := state.ReadResolvedServiceConfigEntries(
|
||||||
ws,
|
ws,
|
||||||
args.Name,
|
args.Name,
|
||||||
|
@ -513,11 +510,12 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
ranOnce = true
|
ranOnce = true
|
||||||
}
|
}
|
||||||
|
|
||||||
thisReply, err := c.computeResolvedServiceConfig(
|
thisReply, err := computeResolvedServiceConfig(
|
||||||
args,
|
args,
|
||||||
upstreamIDs,
|
upstreamIDs,
|
||||||
legacyUpstreams,
|
legacyUpstreams,
|
||||||
entries,
|
entries,
|
||||||
|
c.logger,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -534,214 +532,6 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConfigEntry) computeResolvedServiceConfig(
|
|
||||||
args *structs.ServiceConfigRequest,
|
|
||||||
upstreamIDs []structs.ServiceID,
|
|
||||||
legacyUpstreams bool,
|
|
||||||
entries *configentry.ResolvedServiceConfigSet,
|
|
||||||
) (*structs.ServiceConfigResponse, error) {
|
|
||||||
var thisReply structs.ServiceConfigResponse
|
|
||||||
|
|
||||||
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
|
||||||
|
|
||||||
// TODO(freddy) Refactor this into smaller set of state store functions
|
|
||||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
|
||||||
// blocking query, this function will be rerun and these state store lookups will both be current.
|
|
||||||
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
|
||||||
var proxyConfGlobalProtocol string
|
|
||||||
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
|
|
||||||
if proxyConf != nil {
|
|
||||||
// Apply the proxy defaults to the sidecar's proxy config
|
|
||||||
mapCopy, err := copystructure.Copy(proxyConf.Config)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
|
|
||||||
}
|
|
||||||
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
|
|
||||||
thisReply.Mode = proxyConf.Mode
|
|
||||||
thisReply.TransparentProxy = proxyConf.TransparentProxy
|
|
||||||
thisReply.MeshGateway = proxyConf.MeshGateway
|
|
||||||
thisReply.Expose = proxyConf.Expose
|
|
||||||
|
|
||||||
// Extract the global protocol from proxyConf for upstream configs.
|
|
||||||
rawProtocol := proxyConf.Config["protocol"]
|
|
||||||
if rawProtocol != nil {
|
|
||||||
var ok bool
|
|
||||||
proxyConfGlobalProtocol, ok = rawProtocol.(string)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceConf := entries.GetServiceDefaults(
|
|
||||||
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
|
|
||||||
)
|
|
||||||
if serviceConf != nil {
|
|
||||||
if serviceConf.Expose.Checks {
|
|
||||||
thisReply.Expose.Checks = true
|
|
||||||
}
|
|
||||||
if len(serviceConf.Expose.Paths) >= 1 {
|
|
||||||
thisReply.Expose.Paths = serviceConf.Expose.Paths
|
|
||||||
}
|
|
||||||
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
|
|
||||||
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
|
|
||||||
}
|
|
||||||
if serviceConf.Protocol != "" {
|
|
||||||
if thisReply.ProxyConfig == nil {
|
|
||||||
thisReply.ProxyConfig = make(map[string]interface{})
|
|
||||||
}
|
|
||||||
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
|
|
||||||
}
|
|
||||||
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
|
|
||||||
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
|
|
||||||
}
|
|
||||||
if serviceConf.TransparentProxy.DialedDirectly {
|
|
||||||
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
|
|
||||||
}
|
|
||||||
if serviceConf.Mode != structs.ProxyModeDefault {
|
|
||||||
thisReply.Mode = serviceConf.Mode
|
|
||||||
}
|
|
||||||
|
|
||||||
thisReply.Meta = serviceConf.Meta
|
|
||||||
}
|
|
||||||
|
|
||||||
// First collect all upstreams into a set of seen upstreams.
|
|
||||||
// Upstreams can come from:
|
|
||||||
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
|
|
||||||
// - Implicitly from centralized upstream config in service-defaults
|
|
||||||
seenUpstreams := map[structs.ServiceID]struct{}{}
|
|
||||||
|
|
||||||
var (
|
|
||||||
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
|
|
||||||
|
|
||||||
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
|
|
||||||
// will never be transparent because the service config request does not use the resolved value.
|
|
||||||
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
|
|
||||||
)
|
|
||||||
|
|
||||||
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
|
|
||||||
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
|
|
||||||
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
|
|
||||||
if noUpstreamArgs && !tproxy {
|
|
||||||
return &thisReply, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// First store all upstreams that were provided in the request
|
|
||||||
for _, sid := range upstreamIDs {
|
|
||||||
if _, ok := seenUpstreams[sid]; !ok {
|
|
||||||
seenUpstreams[sid] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then store upstreams inferred from service-defaults and mapify the overrides.
|
|
||||||
var (
|
|
||||||
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
|
||||||
upstreamDefaults *structs.UpstreamConfig
|
|
||||||
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
|
||||||
usConfigs = make(map[structs.ServiceID]map[string]interface{})
|
|
||||||
)
|
|
||||||
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
|
||||||
for i, override := range serviceConf.UpstreamConfig.Overrides {
|
|
||||||
if override.Name == "" {
|
|
||||||
c.logger.Warn(
|
|
||||||
"Skipping UpstreamConfig.Overrides entry without a required name field",
|
|
||||||
"entryIndex", i,
|
|
||||||
"kind", serviceConf.GetKind(),
|
|
||||||
"name", serviceConf.GetName(),
|
|
||||||
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
|
|
||||||
)
|
|
||||||
continue // skip this impossible condition
|
|
||||||
}
|
|
||||||
seenUpstreams[override.ServiceID()] = struct{}{}
|
|
||||||
upstreamConfigs[override.ServiceID()] = override
|
|
||||||
}
|
|
||||||
if serviceConf.UpstreamConfig.Defaults != nil {
|
|
||||||
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
|
|
||||||
|
|
||||||
// Store the upstream defaults under a wildcard key so that they can be applied to
|
|
||||||
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
|
||||||
cfgMap := make(map[string]interface{})
|
|
||||||
upstreamDefaults.MergeInto(cfgMap)
|
|
||||||
|
|
||||||
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
|
|
||||||
usConfigs[wildcard] = cfgMap
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for upstream := range seenUpstreams {
|
|
||||||
resolvedCfg := make(map[string]interface{})
|
|
||||||
|
|
||||||
// The protocol of an upstream is resolved in this order:
|
|
||||||
// 1. Default protocol from proxy-defaults (how all services should be addressed)
|
|
||||||
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
|
|
||||||
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
|
|
||||||
// (how the downstream wants to address it)
|
|
||||||
protocol := proxyConfGlobalProtocol
|
|
||||||
|
|
||||||
upstreamSvcDefaults := entries.GetServiceDefaults(
|
|
||||||
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
|
|
||||||
)
|
|
||||||
if upstreamSvcDefaults != nil {
|
|
||||||
if upstreamSvcDefaults.Protocol != "" {
|
|
||||||
protocol = upstreamSvcDefaults.Protocol
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if protocol != "" {
|
|
||||||
resolvedCfg["protocol"] = protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge centralized defaults for all upstreams before configuration for specific upstreams
|
|
||||||
if upstreamDefaults != nil {
|
|
||||||
upstreamDefaults.MergeInto(resolvedCfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
|
|
||||||
// because it is specific to the proxy instance.
|
|
||||||
//
|
|
||||||
// The goal is to flatten the mesh gateway mode in this order:
|
|
||||||
// 0. Value from centralized upstream_defaults
|
|
||||||
// 1. Value from local proxy registration
|
|
||||||
// 2. Value from centralized upstream_config
|
|
||||||
// 3. Value from local upstream definition. This last step is done in the client's service manager.
|
|
||||||
if !args.MeshGateway.IsZero() {
|
|
||||||
resolvedCfg["mesh_gateway"] = args.MeshGateway
|
|
||||||
}
|
|
||||||
|
|
||||||
if upstreamConfigs[upstream] != nil {
|
|
||||||
upstreamConfigs[upstream].MergeInto(resolvedCfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(resolvedCfg) > 0 {
|
|
||||||
usConfigs[upstream] = resolvedCfg
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// don't allocate the slices just to not fill them
|
|
||||||
if len(usConfigs) == 0 {
|
|
||||||
return &thisReply, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if legacyUpstreams {
|
|
||||||
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
|
|
||||||
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
|
|
||||||
|
|
||||||
for us, conf := range usConfigs {
|
|
||||||
thisReply.UpstreamConfigs[us.ID] = conf
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
|
|
||||||
|
|
||||||
for us, conf := range usConfigs {
|
|
||||||
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
|
|
||||||
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &thisReply, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error {
|
func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error {
|
||||||
// ExportedServices entries are gated from interactions from secondary DCs
|
// ExportedServices entries are gated from interactions from secondary DCs
|
||||||
// because non-default partitions cannot be created in secondaries
|
// because non-default partitions cannot be created in secondaries
|
||||||
|
|
|
@ -6,7 +6,9 @@ import (
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
bexpr "github.com/hashicorp/go-bexpr"
|
bexpr "github.com/hashicorp/go-bexpr"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
@ -15,7 +17,8 @@ import (
|
||||||
|
|
||||||
// Health endpoint is used to query the health information
|
// Health endpoint is used to query the health information
|
||||||
type Health struct {
|
type Health struct {
|
||||||
srv *Server
|
srv *Server
|
||||||
|
logger hclog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChecksInState is used to get all the checks in a given state
|
// ChecksInState is used to get all the checks in a given state
|
||||||
|
@ -232,6 +235,11 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
priorMergeHash uint64
|
||||||
|
ranMergeOnce bool
|
||||||
|
)
|
||||||
|
|
||||||
err = h.srv.blockingQuery(
|
err = h.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
|
@ -243,7 +251,43 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
thisReply.Index, thisReply.Nodes = index, nodes
|
resolvedNodes := nodes
|
||||||
|
if args.MergeCentralConfig {
|
||||||
|
for _, node := range resolvedNodes {
|
||||||
|
ns := node.Service
|
||||||
|
if ns.IsSidecarProxy() || ns.IsGateway() {
|
||||||
|
cfgIndex, mergedns, err := mergeNodeServiceWithCentralConfig(ws, state, args, ns, h.logger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if cfgIndex > index {
|
||||||
|
index = cfgIndex
|
||||||
|
}
|
||||||
|
*node.Service = *mergedns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate a hash of the resolvedNodes driving this response.
|
||||||
|
// Use it to determine if the response is identical to a prior wakeup.
|
||||||
|
newMergeHash, err := hashstructure_v2.Hash(resolvedNodes, hashstructure_v2.FormatV2, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||||
|
}
|
||||||
|
if ranMergeOnce && priorMergeHash == newMergeHash {
|
||||||
|
// the below assignment is not required as the if condition already validates equality,
|
||||||
|
// but makes it more clear that prior value is being reset to the new hash on each run.
|
||||||
|
priorMergeHash = newMergeHash
|
||||||
|
reply.Index = index
|
||||||
|
// NOTE: the prior response is still alive inside of *reply, which is desirable
|
||||||
|
return errNotChanged
|
||||||
|
} else {
|
||||||
|
priorMergeHash = newMergeHash
|
||||||
|
ranMergeOnce = true
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
thisReply.Index, thisReply.Nodes = index, resolvedNodes
|
||||||
|
|
||||||
if len(args.NodeMetaFilters) > 0 {
|
if len(args.NodeMetaFilters) > 0 {
|
||||||
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
|
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
|
||||||
|
|
|
@ -0,0 +1,408 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/configentry"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
"github.com/imdario/mergo"
|
||||||
|
"github.com/mitchellh/copystructure"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mergeNodeServiceWithCentralConfig merges a service instance (NodeService) with the
|
||||||
|
// proxy-defaults/global and service-defaults/:service config entries.
|
||||||
|
// This common helper is used by the blocking query function of different RPC endpoints
|
||||||
|
// that need to return a fully resolved service defintion.
|
||||||
|
func mergeNodeServiceWithCentralConfig(
|
||||||
|
ws memdb.WatchSet,
|
||||||
|
state *state.Store,
|
||||||
|
args *structs.ServiceSpecificRequest,
|
||||||
|
ns *structs.NodeService,
|
||||||
|
logger hclog.Logger) (uint64, *structs.NodeService, error) {
|
||||||
|
|
||||||
|
serviceName := ns.Service
|
||||||
|
var upstreams []structs.ServiceID
|
||||||
|
if ns.IsSidecarProxy() {
|
||||||
|
// This is a sidecar proxy, ignore the proxy service's config since we are
|
||||||
|
// managed by the target service config.
|
||||||
|
serviceName = ns.Proxy.DestinationServiceName
|
||||||
|
|
||||||
|
// Also if we have any upstreams defined, add them to the defaults lookup request
|
||||||
|
// so we can learn about their configs.
|
||||||
|
for _, us := range ns.Proxy.Upstreams {
|
||||||
|
if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService {
|
||||||
|
sid := us.DestinationID()
|
||||||
|
sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta)
|
||||||
|
upstreams = append(upstreams, sid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
configReq := &structs.ServiceConfigRequest{
|
||||||
|
Name: serviceName,
|
||||||
|
Datacenter: args.Datacenter,
|
||||||
|
QueryOptions: args.QueryOptions,
|
||||||
|
MeshGateway: ns.Proxy.MeshGateway,
|
||||||
|
Mode: ns.Proxy.Mode,
|
||||||
|
UpstreamIDs: upstreams,
|
||||||
|
EnterpriseMeta: ns.EnterpriseMeta,
|
||||||
|
}
|
||||||
|
|
||||||
|
// prefer using this vs directly calling the ConfigEntry.ResolveServiceConfig RPC
|
||||||
|
// so as to pass down the same watch set to also watch on changes to
|
||||||
|
// proxy-defaults/global and service-defaults.
|
||||||
|
cfgIndex, configEntries, err := state.ReadResolvedServiceConfigEntries(
|
||||||
|
ws,
|
||||||
|
configReq.Name,
|
||||||
|
&configReq.EnterpriseMeta,
|
||||||
|
upstreams,
|
||||||
|
configReq.Mode,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("Failure looking up service config entries for %s: %v",
|
||||||
|
ns.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defaults, err := computeResolvedServiceConfig(
|
||||||
|
configReq,
|
||||||
|
upstreams,
|
||||||
|
false,
|
||||||
|
configEntries,
|
||||||
|
logger,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("Failure computing service defaults for %s: %v",
|
||||||
|
ns.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mergedns, err := MergeServiceConfig(defaults, ns)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("Failure merging service definition with config entry defaults for %s: %v",
|
||||||
|
ns.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfgIndex, mergedns, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func computeResolvedServiceConfig(
|
||||||
|
args *structs.ServiceConfigRequest,
|
||||||
|
upstreamIDs []structs.ServiceID,
|
||||||
|
legacyUpstreams bool,
|
||||||
|
entries *configentry.ResolvedServiceConfigSet,
|
||||||
|
logger hclog.Logger,
|
||||||
|
) (*structs.ServiceConfigResponse, error) {
|
||||||
|
var thisReply structs.ServiceConfigResponse
|
||||||
|
|
||||||
|
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
||||||
|
|
||||||
|
// TODO(freddy) Refactor this into smaller set of state store functions
|
||||||
|
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
||||||
|
// blocking query, this function will be rerun and these state store lookups will both be current.
|
||||||
|
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
||||||
|
var proxyConfGlobalProtocol string
|
||||||
|
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
|
||||||
|
if proxyConf != nil {
|
||||||
|
// Apply the proxy defaults to the sidecar's proxy config
|
||||||
|
mapCopy, err := copystructure.Copy(proxyConf.Config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
|
||||||
|
}
|
||||||
|
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
|
||||||
|
thisReply.Mode = proxyConf.Mode
|
||||||
|
thisReply.TransparentProxy = proxyConf.TransparentProxy
|
||||||
|
thisReply.MeshGateway = proxyConf.MeshGateway
|
||||||
|
thisReply.Expose = proxyConf.Expose
|
||||||
|
|
||||||
|
// Extract the global protocol from proxyConf for upstream configs.
|
||||||
|
rawProtocol := proxyConf.Config["protocol"]
|
||||||
|
if rawProtocol != nil {
|
||||||
|
var ok bool
|
||||||
|
proxyConfGlobalProtocol, ok = rawProtocol.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceConf := entries.GetServiceDefaults(
|
||||||
|
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
|
||||||
|
)
|
||||||
|
if serviceConf != nil {
|
||||||
|
if serviceConf.Expose.Checks {
|
||||||
|
thisReply.Expose.Checks = true
|
||||||
|
}
|
||||||
|
if len(serviceConf.Expose.Paths) >= 1 {
|
||||||
|
thisReply.Expose.Paths = serviceConf.Expose.Paths
|
||||||
|
}
|
||||||
|
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
|
||||||
|
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
|
||||||
|
}
|
||||||
|
if serviceConf.Protocol != "" {
|
||||||
|
if thisReply.ProxyConfig == nil {
|
||||||
|
thisReply.ProxyConfig = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
|
||||||
|
}
|
||||||
|
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
|
||||||
|
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
|
||||||
|
}
|
||||||
|
if serviceConf.TransparentProxy.DialedDirectly {
|
||||||
|
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
|
||||||
|
}
|
||||||
|
if serviceConf.Mode != structs.ProxyModeDefault {
|
||||||
|
thisReply.Mode = serviceConf.Mode
|
||||||
|
}
|
||||||
|
|
||||||
|
thisReply.Meta = serviceConf.Meta
|
||||||
|
}
|
||||||
|
|
||||||
|
// First collect all upstreams into a set of seen upstreams.
|
||||||
|
// Upstreams can come from:
|
||||||
|
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
|
||||||
|
// - Implicitly from centralized upstream config in service-defaults
|
||||||
|
seenUpstreams := map[structs.ServiceID]struct{}{}
|
||||||
|
|
||||||
|
var (
|
||||||
|
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
|
||||||
|
|
||||||
|
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
|
||||||
|
// will never be transparent because the service config request does not use the resolved value.
|
||||||
|
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
|
||||||
|
)
|
||||||
|
|
||||||
|
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
|
||||||
|
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
|
||||||
|
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
|
||||||
|
if noUpstreamArgs && !tproxy {
|
||||||
|
return &thisReply, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// First store all upstreams that were provided in the request
|
||||||
|
for _, sid := range upstreamIDs {
|
||||||
|
if _, ok := seenUpstreams[sid]; !ok {
|
||||||
|
seenUpstreams[sid] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then store upstreams inferred from service-defaults and mapify the overrides.
|
||||||
|
var (
|
||||||
|
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
||||||
|
upstreamDefaults *structs.UpstreamConfig
|
||||||
|
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
||||||
|
usConfigs = make(map[structs.ServiceID]map[string]interface{})
|
||||||
|
)
|
||||||
|
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
||||||
|
for i, override := range serviceConf.UpstreamConfig.Overrides {
|
||||||
|
if override.Name == "" {
|
||||||
|
logger.Warn(
|
||||||
|
"Skipping UpstreamConfig.Overrides entry without a required name field",
|
||||||
|
"entryIndex", i,
|
||||||
|
"kind", serviceConf.GetKind(),
|
||||||
|
"name", serviceConf.GetName(),
|
||||||
|
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
|
||||||
|
)
|
||||||
|
continue // skip this impossible condition
|
||||||
|
}
|
||||||
|
seenUpstreams[override.ServiceID()] = struct{}{}
|
||||||
|
upstreamConfigs[override.ServiceID()] = override
|
||||||
|
}
|
||||||
|
if serviceConf.UpstreamConfig.Defaults != nil {
|
||||||
|
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
|
||||||
|
|
||||||
|
// Store the upstream defaults under a wildcard key so that they can be applied to
|
||||||
|
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
||||||
|
cfgMap := make(map[string]interface{})
|
||||||
|
upstreamDefaults.MergeInto(cfgMap)
|
||||||
|
|
||||||
|
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
|
||||||
|
usConfigs[wildcard] = cfgMap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for upstream := range seenUpstreams {
|
||||||
|
resolvedCfg := make(map[string]interface{})
|
||||||
|
|
||||||
|
// The protocol of an upstream is resolved in this order:
|
||||||
|
// 1. Default protocol from proxy-defaults (how all services should be addressed)
|
||||||
|
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
|
||||||
|
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
|
||||||
|
// (how the downstream wants to address it)
|
||||||
|
protocol := proxyConfGlobalProtocol
|
||||||
|
|
||||||
|
upstreamSvcDefaults := entries.GetServiceDefaults(
|
||||||
|
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
|
||||||
|
)
|
||||||
|
if upstreamSvcDefaults != nil {
|
||||||
|
if upstreamSvcDefaults.Protocol != "" {
|
||||||
|
protocol = upstreamSvcDefaults.Protocol
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if protocol != "" {
|
||||||
|
resolvedCfg["protocol"] = protocol
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge centralized defaults for all upstreams before configuration for specific upstreams
|
||||||
|
if upstreamDefaults != nil {
|
||||||
|
upstreamDefaults.MergeInto(resolvedCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
|
||||||
|
// because it is specific to the proxy instance.
|
||||||
|
//
|
||||||
|
// The goal is to flatten the mesh gateway mode in this order:
|
||||||
|
// 0. Value from centralized upstream_defaults
|
||||||
|
// 1. Value from local proxy registration
|
||||||
|
// 2. Value from centralized upstream_config
|
||||||
|
// 3. Value from local upstream definition. This last step is done in the client's service manager.
|
||||||
|
if !args.MeshGateway.IsZero() {
|
||||||
|
resolvedCfg["mesh_gateway"] = args.MeshGateway
|
||||||
|
}
|
||||||
|
|
||||||
|
if upstreamConfigs[upstream] != nil {
|
||||||
|
upstreamConfigs[upstream].MergeInto(resolvedCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resolvedCfg) > 0 {
|
||||||
|
usConfigs[upstream] = resolvedCfg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// don't allocate the slices just to not fill them
|
||||||
|
if len(usConfigs) == 0 {
|
||||||
|
return &thisReply, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if legacyUpstreams {
|
||||||
|
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
|
||||||
|
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
|
||||||
|
|
||||||
|
for us, conf := range usConfigs {
|
||||||
|
thisReply.UpstreamConfigs[us.ID] = conf
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
|
||||||
|
|
||||||
|
for us, conf := range usConfigs {
|
||||||
|
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
|
||||||
|
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &thisReply, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MergeServiceConfig merges the service into defaults to produce the final effective
|
||||||
|
// config for the specified service.
|
||||||
|
func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) {
|
||||||
|
if defaults == nil {
|
||||||
|
return service, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't want to change s.registration in place since it is our source of
|
||||||
|
// truth about what was actually registered before defaults applied. So copy
|
||||||
|
// it first.
|
||||||
|
nsRaw, err := copystructure.Copy(service)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge proxy defaults
|
||||||
|
ns := nsRaw.(*structs.NodeService)
|
||||||
|
|
||||||
|
if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
||||||
|
ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode
|
||||||
|
}
|
||||||
|
if ns.Proxy.Mode == structs.ProxyModeDefault {
|
||||||
|
ns.Proxy.Mode = defaults.Mode
|
||||||
|
}
|
||||||
|
if ns.Proxy.TransparentProxy.OutboundListenerPort == 0 {
|
||||||
|
ns.Proxy.TransparentProxy.OutboundListenerPort = defaults.TransparentProxy.OutboundListenerPort
|
||||||
|
}
|
||||||
|
if !ns.Proxy.TransparentProxy.DialedDirectly {
|
||||||
|
ns.Proxy.TransparentProxy.DialedDirectly = defaults.TransparentProxy.DialedDirectly
|
||||||
|
}
|
||||||
|
|
||||||
|
// remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs).
|
||||||
|
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
|
||||||
|
|
||||||
|
for _, us := range defaults.UpstreamIDConfigs {
|
||||||
|
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteUpstreams[us.Upstream] = structs.Upstream{
|
||||||
|
DestinationNamespace: us.Upstream.NamespaceOrDefault(),
|
||||||
|
DestinationPartition: us.Upstream.PartitionOrDefault(),
|
||||||
|
DestinationName: us.Upstream.ID,
|
||||||
|
Config: us.Config,
|
||||||
|
MeshGateway: parsed.MeshGateway,
|
||||||
|
CentrallyConfigured: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
|
||||||
|
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
|
||||||
|
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values.
|
||||||
|
localUpstreams := make(map[structs.ServiceID]struct{})
|
||||||
|
|
||||||
|
// Merge upstream defaults into the local registration
|
||||||
|
for i := range ns.Proxy.Upstreams {
|
||||||
|
// Get a pointer not a value copy of the upstream struct
|
||||||
|
us := &ns.Proxy.Upstreams[i]
|
||||||
|
if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
localUpstreams[us.DestinationID()] = struct{}{}
|
||||||
|
|
||||||
|
remoteCfg, ok := remoteUpstreams[us.DestinationID()]
|
||||||
|
if !ok {
|
||||||
|
// No config defaults to merge
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// The local upstream config mode has the highest precedence, so only overwrite when it's set to the default
|
||||||
|
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
||||||
|
us.MeshGateway.Mode = remoteCfg.MeshGateway.Mode
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge in everything else that is read from the map
|
||||||
|
if err := mergo.Merge(&us.Config, remoteCfg.Config); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the mesh gateway key from opaque config since this is the value that was resolved from
|
||||||
|
// the servers and NOT the final merged value for this upstream.
|
||||||
|
// Note that we use the "mesh_gateway" key and not other variants like "MeshGateway" because
|
||||||
|
// UpstreamConfig.MergeInto and ResolveServiceConfig only use "mesh_gateway".
|
||||||
|
delete(us.Config, "mesh_gateway")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure upstreams present in central config are represented in the local configuration.
|
||||||
|
// This does not apply outside of transparent mode because in that situation every possible upstream already exists
|
||||||
|
// inside of ns.Proxy.Upstreams.
|
||||||
|
if ns.Proxy.Mode == structs.ProxyModeTransparent {
|
||||||
|
for id, remote := range remoteUpstreams {
|
||||||
|
if _, ok := localUpstreams[id]; ok {
|
||||||
|
// Remote upstream is already present locally
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ns, err
|
||||||
|
}
|
|
@ -0,0 +1,458 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/mitchellh/copystructure"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_MergeServiceConfig_TransparentProxy(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
defaults *structs.ServiceConfigResponse
|
||||||
|
service *structs.NodeService
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want *structs.NodeService
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "inherit transparent proxy settings",
|
||||||
|
args: args{
|
||||||
|
defaults: &structs.ServiceConfigResponse{
|
||||||
|
Mode: structs.ProxyModeTransparent,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{
|
||||||
|
OutboundListenerPort: 10101,
|
||||||
|
DialedDirectly: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
service: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeDefault,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeTransparent,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{
|
||||||
|
OutboundListenerPort: 10101,
|
||||||
|
DialedDirectly: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "override transparent proxy settings",
|
||||||
|
args: args{
|
||||||
|
defaults: &structs.ServiceConfigResponse{
|
||||||
|
Mode: structs.ProxyModeTransparent,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{
|
||||||
|
OutboundListenerPort: 10101,
|
||||||
|
DialedDirectly: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
service: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeDirect,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{
|
||||||
|
OutboundListenerPort: 808,
|
||||||
|
DialedDirectly: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeDirect,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{
|
||||||
|
OutboundListenerPort: 808,
|
||||||
|
DialedDirectly: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
got, err := MergeServiceConfig(tt.args.defaults, tt.args.service)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, tt.want, got)
|
||||||
|
|
||||||
|
// The input defaults must not be modified by the merge.
|
||||||
|
// See PR #10647
|
||||||
|
assert.Equal(t, tt.args.defaults, defaultsCopy)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
defaults *structs.ServiceConfigResponse
|
||||||
|
service *structs.NodeService
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want *structs.NodeService
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "new config fields",
|
||||||
|
args: args{
|
||||||
|
defaults: &structs.ServiceConfigResponse{
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: structs.ServiceID{
|
||||||
|
ID: "zap",
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
},
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"passive_health_check": map[string]interface{}{
|
||||||
|
"Interval": int64(10),
|
||||||
|
"MaxFailures": int64(2),
|
||||||
|
},
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "local",
|
||||||
|
},
|
||||||
|
"protocol": "grpc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
service: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zap",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zap",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"passive_health_check": map[string]interface{}{
|
||||||
|
"Interval": int64(10),
|
||||||
|
"MaxFailures": int64(2),
|
||||||
|
},
|
||||||
|
"protocol": "grpc",
|
||||||
|
},
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeLocal,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remote upstream config expands local upstream list in transparent mode",
|
||||||
|
args: args{
|
||||||
|
defaults: &structs.ServiceConfigResponse{
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: structs.ServiceID{
|
||||||
|
ID: "zap",
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
},
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "grpc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
service: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeTransparent,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{
|
||||||
|
OutboundListenerPort: 10101,
|
||||||
|
DialedDirectly: true,
|
||||||
|
},
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zip",
|
||||||
|
LocalBindPort: 8080,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeTransparent,
|
||||||
|
TransparentProxy: structs.TransparentProxyConfig{
|
||||||
|
OutboundListenerPort: 10101,
|
||||||
|
DialedDirectly: true,
|
||||||
|
},
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zip",
|
||||||
|
LocalBindPort: 8080,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zap",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "grpc",
|
||||||
|
},
|
||||||
|
CentrallyConfigured: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "remote upstream config not added to local upstream list outside of transparent mode",
|
||||||
|
args: args{
|
||||||
|
defaults: &structs.ServiceConfigResponse{
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: structs.ServiceID{
|
||||||
|
ID: "zap",
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
},
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "grpc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
service: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeDirect,
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zip",
|
||||||
|
LocalBindPort: 8080,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
Mode: structs.ProxyModeDirect,
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zip",
|
||||||
|
LocalBindPort: 8080,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "upstream mode from remote defaults overrides local default",
|
||||||
|
args: args{
|
||||||
|
defaults: &structs.ServiceConfigResponse{
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: structs.ServiceID{
|
||||||
|
ID: "zap",
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
},
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "local",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
service: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeRemote,
|
||||||
|
},
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zap",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeRemote,
|
||||||
|
},
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zap",
|
||||||
|
Config: map[string]interface{}{},
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeLocal,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mode in local upstream config overrides all",
|
||||||
|
args: args{
|
||||||
|
defaults: &structs.ServiceConfigResponse{
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: structs.ServiceID{
|
||||||
|
ID: "zap",
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
},
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "local",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
service: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeRemote,
|
||||||
|
},
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zap",
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeNone,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &structs.NodeService{
|
||||||
|
ID: "foo-proxy",
|
||||||
|
Service: "foo-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "foo",
|
||||||
|
DestinationServiceID: "foo",
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeRemote,
|
||||||
|
},
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationNamespace: "default",
|
||||||
|
DestinationPartition: "default",
|
||||||
|
DestinationName: "zap",
|
||||||
|
Config: map[string]interface{}{},
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeNone,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
got, err := MergeServiceConfig(tt.args.defaults, tt.args.service)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, tt.want, got)
|
||||||
|
|
||||||
|
// The input defaults must not be modified by the merge.
|
||||||
|
// See PR #10647
|
||||||
|
assert.Equal(t, tt.args.defaults, defaultsCopy)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,7 +10,7 @@ func init() {
|
||||||
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} })
|
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} })
|
||||||
registerEndpoint(func(s *Server) interface{} { return &FederationState{s} })
|
registerEndpoint(func(s *Server) interface{} { return &FederationState{s} })
|
||||||
registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} })
|
registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} })
|
||||||
registerEndpoint(func(s *Server) interface{} { return &Health{s} })
|
registerEndpoint(func(s *Server) interface{} { return &Health{s, s.loggers.Named(logging.Health)} })
|
||||||
registerEndpoint(func(s *Server) interface{} { return &Intention{s, s.loggers.Named(logging.Intentions)} })
|
registerEndpoint(func(s *Server) interface{} { return &Intention{s, s.loggers.Named(logging.Intentions)} })
|
||||||
registerEndpoint(func(s *Server) interface{} { return &Internal{s, s.loggers.Named(logging.Internal)} })
|
registerEndpoint(func(s *Server) interface{} { return &Internal{s, s.loggers.Named(logging.Internal)} })
|
||||||
registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} })
|
registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} })
|
||||||
|
|
|
@ -203,6 +203,10 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
|
||||||
args.TagFilter = true
|
args.TagFilter = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := params["merge-central-config"]; ok {
|
||||||
|
args.MergeCentralConfig = true
|
||||||
|
}
|
||||||
|
|
||||||
// Determine the prefix
|
// Determine the prefix
|
||||||
var prefix string
|
var prefix string
|
||||||
switch healthType {
|
switch healthType {
|
||||||
|
|
|
@ -1882,6 +1882,167 @@ func TestFilterNonPassing(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register the service
|
||||||
|
registerServiceReq := structs.TestRegisterRequestProxy(t)
|
||||||
|
registerServiceReq.Check = &structs.HealthCheck{
|
||||||
|
Node: registerServiceReq.Node,
|
||||||
|
Name: "check1",
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
|
||||||
|
|
||||||
|
// Register proxy-defaults
|
||||||
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||||
|
|
||||||
|
// Register service-defaults
|
||||||
|
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||||
|
|
||||||
|
type testCase struct {
|
||||||
|
testCaseName string
|
||||||
|
serviceName string
|
||||||
|
connect bool
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
url := fmt.Sprintf("/v1/health/service/%s?merge-central-config", tc.serviceName)
|
||||||
|
if tc.connect {
|
||||||
|
url = fmt.Sprintf("/v1/health/connect/%s?merge-central-config", tc.serviceName)
|
||||||
|
}
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
var obj interface{}
|
||||||
|
var err error
|
||||||
|
if tc.connect {
|
||||||
|
obj, err = a.srv.HealthConnectServiceNodes(resp, req)
|
||||||
|
} else {
|
||||||
|
obj, err = a.srv.HealthServiceNodes(resp, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
checkServiceNodes := obj.(structs.CheckServiceNodes)
|
||||||
|
|
||||||
|
// validate response
|
||||||
|
assert.Len(t, checkServiceNodes, 1)
|
||||||
|
v := checkServiceNodes[0]
|
||||||
|
|
||||||
|
validateMergeCentralConfigResponse(t, v.Service.ToServiceNode(registerServiceReq.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||||
|
}
|
||||||
|
testCases := []testCase{
|
||||||
|
{
|
||||||
|
testCaseName: "List healthy service instances with merge-central-config",
|
||||||
|
serviceName: registerServiceReq.Service.Service,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
testCaseName: "List healthy connect capable service instances with merge-central-config",
|
||||||
|
serviceName: registerServiceReq.Service.Proxy.DestinationServiceName,
|
||||||
|
connect: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.testCaseName, func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHealthServiceNodes_MergeCentralConfigBlocking(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register the service
|
||||||
|
registerServiceReq := structs.TestRegisterRequestProxy(t)
|
||||||
|
registerServiceReq.Check = &structs.HealthCheck{
|
||||||
|
Node: registerServiceReq.Node,
|
||||||
|
Name: "check1",
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
|
||||||
|
|
||||||
|
// Register proxy-defaults
|
||||||
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||||
|
|
||||||
|
// Run the query
|
||||||
|
rpcReq := structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: registerServiceReq.Service.Service,
|
||||||
|
MergeCentralConfig: true,
|
||||||
|
}
|
||||||
|
var rpcResp structs.IndexedCheckServiceNodes
|
||||||
|
assert.Nil(t, a.RPC("Health.ServiceNodes", &rpcReq, &rpcResp))
|
||||||
|
|
||||||
|
assert.Len(t, rpcResp.Nodes, 1)
|
||||||
|
nodeService := rpcResp.Nodes[0].Service
|
||||||
|
assert.Equal(t, registerServiceReq.Service.Service, nodeService.Service)
|
||||||
|
// validate proxy global defaults are resolved in the merged service config
|
||||||
|
assert.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config)
|
||||||
|
assert.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode)
|
||||||
|
|
||||||
|
// Async cause a change - register service defaults
|
||||||
|
waitIndex := rpcResp.Index
|
||||||
|
start := time.Now()
|
||||||
|
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
|
||||||
|
go func() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
// Register service-defaults
|
||||||
|
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||||
|
}()
|
||||||
|
|
||||||
|
const waitDuration = 3 * time.Second
|
||||||
|
RUN_BLOCKING_QUERY:
|
||||||
|
url := fmt.Sprintf("/v1/health/service/%s?merge-central-config&wait=%s&index=%d",
|
||||||
|
registerServiceReq.Service.Service, waitDuration.String(), waitIndex)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
idx := getIndex(t, resp)
|
||||||
|
if idx < waitIndex {
|
||||||
|
t.Fatalf("bad index returned: %v", idx)
|
||||||
|
} else if idx == waitIndex {
|
||||||
|
if elapsed > waitDuration {
|
||||||
|
// This should prevent the loop from running longer than the waitDuration
|
||||||
|
t.Fatalf("too slow: %v", elapsed)
|
||||||
|
}
|
||||||
|
goto RUN_BLOCKING_QUERY
|
||||||
|
}
|
||||||
|
// Should block at least 100ms before getting the changed results
|
||||||
|
if elapsed < 100*time.Millisecond {
|
||||||
|
t.Fatalf("too fast: %v", elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkServiceNodes := obj.(structs.CheckServiceNodes)
|
||||||
|
|
||||||
|
// validate response
|
||||||
|
assert.Len(t, checkServiceNodes, 1)
|
||||||
|
v := checkServiceNodes[0].Service.ToServiceNode(registerServiceReq.Node)
|
||||||
|
|
||||||
|
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||||
|
}
|
||||||
|
|
||||||
func peerQuerySuffix(peerName string) string {
|
func peerQuerySuffix(peerName string) string {
|
||||||
if peerName == "" {
|
if peerName == "" {
|
||||||
return ""
|
return ""
|
||||||
|
|
|
@ -38,7 +38,9 @@ func (c *Client) ServiceNodes(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req structs.ServiceSpecificRequest,
|
req structs.ServiceSpecificRequest,
|
||||||
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
||||||
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
|
// Note: if MergeCentralConfig is requested, default to using the RPC backend for now
|
||||||
|
// as the streaming backend and materializer does not have support for merging yet.
|
||||||
|
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) && !req.MergeCentralConfig {
|
||||||
c.QueryOptionDefaults(&req.QueryOptions)
|
c.QueryOptionDefaults(&req.QueryOptions)
|
||||||
|
|
||||||
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
|
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
|
||||||
|
|
|
@ -82,6 +82,16 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
|
||||||
},
|
},
|
||||||
expected: useCache,
|
expected: useCache,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "rpc if merge-central-config",
|
||||||
|
req: structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web1",
|
||||||
|
MergeCentralConfig: true,
|
||||||
|
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
|
||||||
|
},
|
||||||
|
expected: useRPC,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
|
|
@ -4,12 +4,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/imdario/mergo"
|
|
||||||
"github.com/mitchellh/copystructure"
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,7 +145,7 @@ func (w *serviceConfigWatch) register(ctx context.Context) error {
|
||||||
|
|
||||||
// Merge the local registration with the central defaults and update this service
|
// Merge the local registration with the central defaults and update this service
|
||||||
// in the local state.
|
// in the local state.
|
||||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
|
merged, err := consul.MergeServiceConfig(serviceDefaults, w.registration.Service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -276,7 +275,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
|
||||||
|
|
||||||
// Merge the local registration with the central defaults and update this service
|
// Merge the local registration with the central defaults and update this service
|
||||||
// in the local state.
|
// in the local state.
|
||||||
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service)
|
merged, err := consul.MergeServiceConfig(serviceDefaults, w.registration.Service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -348,114 +347,3 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
|
||||||
}
|
}
|
||||||
return req
|
return req
|
||||||
}
|
}
|
||||||
|
|
||||||
// mergeServiceConfig from service into defaults to produce the final effective
|
|
||||||
// config for the watched service.
|
|
||||||
func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) {
|
|
||||||
if defaults == nil {
|
|
||||||
return service, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// We don't want to change s.registration in place since it is our source of
|
|
||||||
// truth about what was actually registered before defaults applied. So copy
|
|
||||||
// it first.
|
|
||||||
nsRaw, err := copystructure.Copy(service)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge proxy defaults
|
|
||||||
ns := nsRaw.(*structs.NodeService)
|
|
||||||
|
|
||||||
if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
|
||||||
ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode
|
|
||||||
}
|
|
||||||
if ns.Proxy.Mode == structs.ProxyModeDefault {
|
|
||||||
ns.Proxy.Mode = defaults.Mode
|
|
||||||
}
|
|
||||||
if ns.Proxy.TransparentProxy.OutboundListenerPort == 0 {
|
|
||||||
ns.Proxy.TransparentProxy.OutboundListenerPort = defaults.TransparentProxy.OutboundListenerPort
|
|
||||||
}
|
|
||||||
if !ns.Proxy.TransparentProxy.DialedDirectly {
|
|
||||||
ns.Proxy.TransparentProxy.DialedDirectly = defaults.TransparentProxy.DialedDirectly
|
|
||||||
}
|
|
||||||
|
|
||||||
// remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs).
|
|
||||||
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
|
|
||||||
|
|
||||||
for _, us := range defaults.UpstreamIDConfigs {
|
|
||||||
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
remoteUpstreams[us.Upstream] = structs.Upstream{
|
|
||||||
DestinationNamespace: us.Upstream.NamespaceOrDefault(),
|
|
||||||
DestinationPartition: us.Upstream.PartitionOrDefault(),
|
|
||||||
DestinationName: us.Upstream.ID,
|
|
||||||
Config: us.Config,
|
|
||||||
MeshGateway: parsed.MeshGateway,
|
|
||||||
CentrallyConfigured: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
|
|
||||||
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
|
|
||||||
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values.
|
|
||||||
localUpstreams := make(map[structs.ServiceID]struct{})
|
|
||||||
|
|
||||||
// Merge upstream defaults into the local registration
|
|
||||||
for i := range ns.Proxy.Upstreams {
|
|
||||||
// Get a pointer not a value copy of the upstream struct
|
|
||||||
us := &ns.Proxy.Upstreams[i]
|
|
||||||
if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
localUpstreams[us.DestinationID()] = struct{}{}
|
|
||||||
|
|
||||||
remoteCfg, ok := remoteUpstreams[us.DestinationID()]
|
|
||||||
if !ok {
|
|
||||||
// No config defaults to merge
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// The local upstream config mode has the highest precedence, so only overwrite when it's set to the default
|
|
||||||
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
|
||||||
us.MeshGateway.Mode = remoteCfg.MeshGateway.Mode
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge in everything else that is read from the map
|
|
||||||
if err := mergo.Merge(&us.Config, remoteCfg.Config); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the mesh gateway key from opaque config since this is the value that was resolved from
|
|
||||||
// the servers and NOT the final merged value for this upstream.
|
|
||||||
// Note that we use the "mesh_gateway" key and not other variants like "MeshGateway" because
|
|
||||||
// UpstreamConfig.MergeInto and ResolveServiceConfig only use "mesh_gateway".
|
|
||||||
delete(us.Config, "mesh_gateway")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure upstreams present in central config are represented in the local configuration.
|
|
||||||
// This does not apply outside of transparent mode because in that situation every possible upstream already exists
|
|
||||||
// inside of ns.Proxy.Upstreams.
|
|
||||||
if ns.Proxy.Mode == structs.ProxyModeTransparent {
|
|
||||||
for id, remote := range remoteUpstreams {
|
|
||||||
if _, ok := localUpstreams[id]; ok {
|
|
||||||
// Remote upstream is already present locally
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ns, err
|
|
||||||
}
|
|
||||||
|
|
|
@ -8,9 +8,6 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/mitchellh/copystructure"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -860,451 +857,3 @@ func convertToMap(v interface{}) (map[string]interface{}, error) {
|
||||||
|
|
||||||
return raw, nil
|
return raw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) {
|
|
||||||
type args struct {
|
|
||||||
defaults *structs.ServiceConfigResponse
|
|
||||||
service *structs.NodeService
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
args args
|
|
||||||
want *structs.NodeService
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "new config fields",
|
|
||||||
args: args{
|
|
||||||
defaults: &structs.ServiceConfigResponse{
|
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
|
||||||
{
|
|
||||||
Upstream: structs.ServiceID{
|
|
||||||
ID: "zap",
|
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
|
||||||
},
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"passive_health_check": map[string]interface{}{
|
|
||||||
"Interval": int64(10),
|
|
||||||
"MaxFailures": int64(2),
|
|
||||||
},
|
|
||||||
"mesh_gateway": map[string]interface{}{
|
|
||||||
"Mode": "local",
|
|
||||||
},
|
|
||||||
"protocol": "grpc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
service: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zap",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
want: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zap",
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"passive_health_check": map[string]interface{}{
|
|
||||||
"Interval": int64(10),
|
|
||||||
"MaxFailures": int64(2),
|
|
||||||
},
|
|
||||||
"protocol": "grpc",
|
|
||||||
},
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeLocal,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "remote upstream config expands local upstream list in transparent mode",
|
|
||||||
args: args{
|
|
||||||
defaults: &structs.ServiceConfigResponse{
|
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
|
||||||
{
|
|
||||||
Upstream: structs.ServiceID{
|
|
||||||
ID: "zap",
|
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
|
||||||
},
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "grpc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
service: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeTransparent,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{
|
|
||||||
OutboundListenerPort: 10101,
|
|
||||||
DialedDirectly: true,
|
|
||||||
},
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zip",
|
|
||||||
LocalBindPort: 8080,
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
want: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeTransparent,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{
|
|
||||||
OutboundListenerPort: 10101,
|
|
||||||
DialedDirectly: true,
|
|
||||||
},
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zip",
|
|
||||||
LocalBindPort: 8080,
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zap",
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "grpc",
|
|
||||||
},
|
|
||||||
CentrallyConfigured: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "remote upstream config not added to local upstream list outside of transparent mode",
|
|
||||||
args: args{
|
|
||||||
defaults: &structs.ServiceConfigResponse{
|
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
|
||||||
{
|
|
||||||
Upstream: structs.ServiceID{
|
|
||||||
ID: "zap",
|
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
|
||||||
},
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "grpc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
service: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeDirect,
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zip",
|
|
||||||
LocalBindPort: 8080,
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
want: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeDirect,
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zip",
|
|
||||||
LocalBindPort: 8080,
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"protocol": "http",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "upstream mode from remote defaults overrides local default",
|
|
||||||
args: args{
|
|
||||||
defaults: &structs.ServiceConfigResponse{
|
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
|
||||||
{
|
|
||||||
Upstream: structs.ServiceID{
|
|
||||||
ID: "zap",
|
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
|
||||||
},
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"mesh_gateway": map[string]interface{}{
|
|
||||||
"Mode": "local",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
service: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeRemote,
|
|
||||||
},
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zap",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
want: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeRemote,
|
|
||||||
},
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zap",
|
|
||||||
Config: map[string]interface{}{},
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeLocal,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "mode in local upstream config overrides all",
|
|
||||||
args: args{
|
|
||||||
defaults: &structs.ServiceConfigResponse{
|
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
|
||||||
{
|
|
||||||
Upstream: structs.ServiceID{
|
|
||||||
ID: "zap",
|
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
|
||||||
},
|
|
||||||
Config: map[string]interface{}{
|
|
||||||
"mesh_gateway": map[string]interface{}{
|
|
||||||
"Mode": "local",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
service: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeRemote,
|
|
||||||
},
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zap",
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeNone,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
want: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeRemote,
|
|
||||||
},
|
|
||||||
Upstreams: structs.Upstreams{
|
|
||||||
structs.Upstream{
|
|
||||||
DestinationNamespace: "default",
|
|
||||||
DestinationPartition: "default",
|
|
||||||
DestinationName: "zap",
|
|
||||||
Config: map[string]interface{}{},
|
|
||||||
MeshGateway: structs.MeshGatewayConfig{
|
|
||||||
Mode: structs.MeshGatewayModeNone,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
got, err := mergeServiceConfig(tt.args.defaults, tt.args.service)
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.Equal(t, tt.want, got)
|
|
||||||
|
|
||||||
// The input defaults must not be modified by the merge.
|
|
||||||
// See PR #10647
|
|
||||||
assert.Equal(t, tt.args.defaults, defaultsCopy)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_mergeServiceConfig_TransparentProxy(t *testing.T) {
|
|
||||||
type args struct {
|
|
||||||
defaults *structs.ServiceConfigResponse
|
|
||||||
service *structs.NodeService
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
args args
|
|
||||||
want *structs.NodeService
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "inherit transparent proxy settings",
|
|
||||||
args: args{
|
|
||||||
defaults: &structs.ServiceConfigResponse{
|
|
||||||
Mode: structs.ProxyModeTransparent,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{
|
|
||||||
OutboundListenerPort: 10101,
|
|
||||||
DialedDirectly: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
service: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeDefault,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
want: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeTransparent,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{
|
|
||||||
OutboundListenerPort: 10101,
|
|
||||||
DialedDirectly: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "override transparent proxy settings",
|
|
||||||
args: args{
|
|
||||||
defaults: &structs.ServiceConfigResponse{
|
|
||||||
Mode: structs.ProxyModeTransparent,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{
|
|
||||||
OutboundListenerPort: 10101,
|
|
||||||
DialedDirectly: false,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
service: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeDirect,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{
|
|
||||||
OutboundListenerPort: 808,
|
|
||||||
DialedDirectly: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
want: &structs.NodeService{
|
|
||||||
ID: "foo-proxy",
|
|
||||||
Service: "foo-proxy",
|
|
||||||
Proxy: structs.ConnectProxyConfig{
|
|
||||||
DestinationServiceName: "foo",
|
|
||||||
DestinationServiceID: "foo",
|
|
||||||
Mode: structs.ProxyModeDirect,
|
|
||||||
TransparentProxy: structs.TransparentProxyConfig{
|
|
||||||
OutboundListenerPort: 808,
|
|
||||||
DialedDirectly: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
got, err := mergeServiceConfig(tt.args.defaults, tt.args.service)
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.Equal(t, tt.want, got)
|
|
||||||
|
|
||||||
// The input defaults must not be modified by the merge.
|
|
||||||
// See PR #10647
|
|
||||||
assert.Equal(t, tt.args.defaults, defaultsCopy)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -706,6 +706,12 @@ type ServiceSpecificRequest struct {
|
||||||
// Ingress if true will only search for Ingress gateways for the given service.
|
// Ingress if true will only search for Ingress gateways for the given service.
|
||||||
Ingress bool
|
Ingress bool
|
||||||
|
|
||||||
|
// MergeCentralConfig when set to true returns a service definition merged with
|
||||||
|
// the proxy-defaults/global and service-defaults/:service config entries.
|
||||||
|
// This can be used to ensure a full service definition is returned in the response
|
||||||
|
// especially when the service might not be written into the catalog that way.
|
||||||
|
MergeCentralConfig bool
|
||||||
|
|
||||||
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
||||||
QueryOptions
|
QueryOptions
|
||||||
}
|
}
|
||||||
|
@ -752,6 +758,7 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo {
|
||||||
r.PeerName,
|
r.PeerName,
|
||||||
r.Ingress,
|
r.Ingress,
|
||||||
r.ServiceKind,
|
r.ServiceKind,
|
||||||
|
r.MergeCentralConfig,
|
||||||
}, nil)
|
}, nil)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// If there is an error, we don't set the key. A blank key forces
|
// If there is an error, we don't set the key. A blank key forces
|
||||||
|
@ -1330,7 +1337,8 @@ func (t *ServiceConnect) UnmarshalJSON(data []byte) (err error) {
|
||||||
|
|
||||||
// IsSidecarProxy returns true if the NodeService is a sidecar proxy.
|
// IsSidecarProxy returns true if the NodeService is a sidecar proxy.
|
||||||
func (s *NodeService) IsSidecarProxy() bool {
|
func (s *NodeService) IsSidecarProxy() bool {
|
||||||
return s.Kind == ServiceKindConnectProxy && s.Proxy.DestinationServiceID != ""
|
return s.Kind == ServiceKindConnectProxy &&
|
||||||
|
(s.Proxy.DestinationServiceID != "" || s.Proxy.DestinationServiceName != "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NodeService) IsGateway() bool {
|
func (s *NodeService) IsGateway() bool {
|
||||||
|
|
|
@ -190,6 +190,12 @@ type QueryOptions struct {
|
||||||
// Filter requests filtering data prior to it being returned. The string
|
// Filter requests filtering data prior to it being returned. The string
|
||||||
// is a go-bexpr compatible expression.
|
// is a go-bexpr compatible expression.
|
||||||
Filter string
|
Filter string
|
||||||
|
|
||||||
|
// MergeCentralConfig returns a service definition merged with the
|
||||||
|
// proxy-defaults/global and service-defaults/:service config entries.
|
||||||
|
// This can be used to ensure a full service definition is returned in the response
|
||||||
|
// especially when the service might not be written into the catalog that way.
|
||||||
|
MergeCentralConfig bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *QueryOptions) Context() context.Context {
|
func (o *QueryOptions) Context() context.Context {
|
||||||
|
@ -858,6 +864,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
|
||||||
r.header.Set("Cache-Control", strings.Join(cc, ", "))
|
r.header.Set("Cache-Control", strings.Join(cc, ", "))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if q.MergeCentralConfig {
|
||||||
|
r.params.Set("merge-central-config", "")
|
||||||
|
}
|
||||||
|
|
||||||
r.ctx = q.ctx
|
r.ctx = q.ctx
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,4 +61,5 @@ const (
|
||||||
Watch string = "watch"
|
Watch string = "watch"
|
||||||
XDS string = "xds"
|
XDS string = "xds"
|
||||||
Vault string = "vault"
|
Vault string = "vault"
|
||||||
|
Health string = "health"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue