[OSS] Support merge-central-config option in node services list API (#13450)

Adds the merge-central-config query param option to the /catalog/node-services/:node-name API,
to get a service definition in the response that is merged with central defaults (proxy-defaults/service-defaults).

Updated the consul connect envoy command to use this option when
retrieving the proxy service details so as to render the bootstrap configuration correctly.
This commit is contained in:
Riddhi Shah 2022-06-15 08:30:31 -07:00 committed by GitHub
parent 0a9c1c0649
commit 411edc876b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 182 additions and 8 deletions

3
.changelog/13450.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:enhancement
api: `merge-central-config` query parameter support added to `/catalog/node-services/:node-name` API, to view a fully resolved service definition (especially when not written into the catalog that way).
```

View File

@ -499,6 +499,10 @@ func (s *HTTPHandlers) CatalogNodeServiceList(resp http.ResponseWriter, req *htt
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"}
}
if _, ok := req.URL.Query()["merge-central-config"]; ok {
args.MergeCentralConfig = true
}
// Make the RPC request
var out structs.IndexedNodeServiceList
defer setMeta(resp, &out.QueryMeta)

View File

@ -1529,6 +1529,111 @@ func TestCatalogNodeServiceList(t *testing.T) {
require.Equal(t, args.Service.Proxy, proxySvc.Proxy)
}
func TestCatalogNodeServiceList_MergeCentralConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register the service
registerServiceReq := registerService(t, a)
// Register proxy-defaults
proxyGlobalEntry := registerProxyDefaults(t, a)
// Register service-defaults
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config", registerServiceReq.Node)
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogNodeServiceList(resp, req)
require.NoError(t, err)
assertIndex(t, resp)
nodeServices := obj.(*structs.NodeServiceList)
// validate response
require.Len(t, nodeServices.Services, 1)
validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
}
func TestCatalogNodeServiceList_MergeCentralConfigBlocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register the service
registerServiceReq := registerService(t, a)
// Register proxy-defaults
proxyGlobalEntry := registerProxyDefaults(t, a)
// Run the query
rpcReq := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: registerServiceReq.Node,
MergeCentralConfig: true,
}
var rpcResp structs.IndexedNodeServiceList
require.NoError(t, a.RPC("Catalog.NodeServiceList", &rpcReq, &rpcResp))
require.Len(t, rpcResp.NodeServices.Services, 1)
nodeService := rpcResp.NodeServices.Services[0]
require.Equal(t, registerServiceReq.Service.Service, nodeService.Service)
// validate proxy global defaults are resolved in the merged service config
require.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config)
require.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode)
// Async cause a change - register service defaults
waitIndex := rpcResp.Index
start := time.Now()
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
go func() {
time.Sleep(100 * time.Millisecond)
// Register service-defaults
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
}()
const waitDuration = 3 * time.Second
RUN_BLOCKING_QUERY:
url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config&wait=%s&index=%d",
registerServiceReq.Node, waitDuration.String(), waitIndex)
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogNodeServiceList(resp, req)
require.NoError(t, err)
assertIndex(t, resp)
elapsed := time.Since(start)
idx := getIndex(t, resp)
if idx < waitIndex {
t.Fatalf("bad index returned: %v", idx)
} else if idx == waitIndex {
if elapsed > waitDuration {
// This should prevent the loop from running longer than the waitDuration
t.Fatalf("too slow: %v", elapsed)
}
goto RUN_BLOCKING_QUERY
}
// Should block at least 100ms before getting the changed results
if elapsed < 100*time.Millisecond {
t.Fatalf("too fast: %v", elapsed)
}
nodeServices := obj.(*structs.NodeServiceList)
// validate response
require.Len(t, nodeServices.Services, 1)
validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
}
func TestCatalogNodeServices_Filter(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -869,6 +869,11 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru
return err
}
var (
priorMergeHash uint64
ranMergeOnce bool
)
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
@ -878,10 +883,55 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru
return err
}
mergedServices := services
var cfgIndex uint64
if services != nil && args.MergeCentralConfig {
var mergedNodeServices []*structs.NodeService
for _, ns := range services.Services {
mergedns := ns
if ns.IsSidecarProxy() || ns.IsGateway() {
serviceSpecificReq := structs.ServiceSpecificRequest{
Datacenter: args.Datacenter,
QueryOptions: args.QueryOptions,
}
cfgIndex, mergedns, err = mergeNodeServiceWithCentralConfig(ws, state, &serviceSpecificReq, ns, c.logger)
if err != nil {
return err
}
if cfgIndex > index {
index = cfgIndex
}
}
mergedNodeServices = append(mergedNodeServices, mergedns)
}
if len(mergedNodeServices) > 0 {
mergedServices.Services = mergedNodeServices
}
// Generate a hash of the mergedServices driving this response.
// Use it to determine if the response is identical to a prior wakeup.
newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranMergeOnce && priorMergeHash == newMergeHash {
// the below assignment is not required as the if condition already validates equality,
// but makes it more clear that prior value is being reset to the new hash on each run.
priorMergeHash = newMergeHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which is desirable
return errNotChanged
} else {
priorMergeHash = newMergeHash
ranMergeOnce = true
}
}
reply.Index = index
if services != nil {
reply.NodeServices = *services
if mergedServices != nil {
reply.NodeServices = *mergedServices
raw, err := filter.Execute(reply.NodeServices.Services)
if err != nil {

View File

@ -776,9 +776,15 @@ func (r *ServiceSpecificRequest) CacheMinIndex() uint64 {
// NodeSpecificRequest is used to request the information about a single node
type NodeSpecificRequest struct {
Datacenter string
Node string
PeerName string
Datacenter string
Node string
PeerName string
// MergeCentralConfig when set to true returns a service definition merged with
// the proxy-defaults/global and service-defaults/:service config entries.
// This can be used to ensure a full service definition is returned in the response
// especially when the service might not be written into the catalog that way.
MergeCentralConfig bool
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
QueryOptions
}
@ -801,6 +807,7 @@ func (r *NodeSpecificRequest) CacheInfo() cache.RequestInfo {
r.Node,
r.Filter,
r.EnterpriseMeta,
r.MergeCentralConfig,
}, nil)
if err == nil {
// If there is an error, we don't set the key. A blank key forces

View File

@ -531,13 +531,18 @@ func (c *cmd) generateConfig() ([]byte, error) {
datacenter = svc.Datacenter
} else {
filter := fmt.Sprintf("ID == %q", c.proxyID)
svcList, _, err := c.client.Catalog().NodeServiceList(c.nodeName, &api.QueryOptions{Filter: filter})
svcList, _, err := c.client.Catalog().NodeServiceList(c.nodeName,
&api.QueryOptions{Filter: filter, MergeCentralConfig: true})
if err != nil {
return nil, fmt.Errorf("failed to fetch proxy config from catalog for node %q: %w", c.nodeName, err)
}
if len(svcList.Services) != 1 {
return nil, fmt.Errorf("expected to find only one proxy service with ID: %q", c.proxyID)
if len(svcList.Services) == 0 {
return nil, fmt.Errorf("Proxy service with ID %q not found", c.proxyID)
}
if len(svcList.Services) > 1 {
return nil, fmt.Errorf("Expected to find only one proxy service with ID %q, but more were found", c.proxyID)
}
svcProxyConfig = svcList.Services[0].Proxy
serviceName = svcList.Services[0].Service
ns = svcList.Services[0].Namespace