mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 03:29:43 +00:00
b9ab63c55d
When the v2 catalog experiment is enabled the old v1 catalog apis will be forcibly disabled at both the API (json) layer and the RPC (msgpack) layer. This will also disable anti-entropy as it uses the v1 api. This includes all of /v1/catalog/*, /v1/health/*, most of /v1/agent/*, /v1/config/*, and most of /v1/internal/*.
2139 lines
58 KiB
Go
2139 lines
58 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/serf/coordinate"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
|
"github.com/hashicorp/consul/testrpc"
|
|
)
|
|
|
|
func TestCatalogEndpointsFailInV2(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, `experiments = ["resource-apis"]`)
|
|
|
|
checkRequest := func(method, url string) {
|
|
t.Run(method+" "+url, func(t *testing.T) {
|
|
assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url, "{}")
|
|
})
|
|
}
|
|
|
|
checkRequest("PUT", "/v1/catalog/register")
|
|
checkRequest("GET", "/v1/catalog/connect/")
|
|
checkRequest("PUT", "/v1/catalog/deregister")
|
|
checkRequest("GET", "/v1/catalog/datacenters")
|
|
checkRequest("GET", "/v1/catalog/nodes")
|
|
checkRequest("GET", "/v1/catalog/services")
|
|
checkRequest("GET", "/v1/catalog/service/")
|
|
checkRequest("GET", "/v1/catalog/node/")
|
|
checkRequest("GET", "/v1/catalog/node-services/")
|
|
checkRequest("GET", "/v1/catalog/gateway-services/")
|
|
}
|
|
|
|
func assertV1CatalogEndpointDoesNotWorkWithV2(t *testing.T, a *TestAgent, method, url string, requestBody string) {
|
|
var body io.Reader
|
|
switch method {
|
|
case http.MethodPost, http.MethodPut:
|
|
body = strings.NewReader(requestBody + "\n")
|
|
}
|
|
|
|
req, err := http.NewRequest(method, url, body)
|
|
require.NoError(t, err)
|
|
|
|
resp := httptest.NewRecorder()
|
|
a.srv.h.ServeHTTP(resp, req)
|
|
require.Equal(t, http.StatusBadRequest, resp.Code)
|
|
|
|
got, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
|
|
require.Contains(t, string(got), structs.ErrUsingV2CatalogExperiment.Error())
|
|
}
|
|
|
|
func TestCatalogRegister_PeeringRegistration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
t.Run("deny peer registrations by default", func(t *testing.T) {
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
// Register request with peer
|
|
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
|
|
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
|
|
|
|
obj, err := a.srv.CatalogRegister(nil, req)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
|
|
require.Nil(t, obj)
|
|
})
|
|
|
|
t.Run("cannot hcl set the peer registrations config", func(t *testing.T) {
|
|
// this will have no effect, as the value is overriden in non user source
|
|
a := NewTestAgent(t, "peering = { test_allow_peer_registrations = true }")
|
|
defer a.Shutdown()
|
|
|
|
// Register request with peer
|
|
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
|
|
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
|
|
|
|
obj, err := a.srv.CatalogRegister(nil, req)
|
|
require.Error(t, err)
|
|
require.Contains(t, err.Error(), "cannot register requests with PeerName in them")
|
|
require.Nil(t, obj)
|
|
})
|
|
|
|
t.Run("allow peer registrations with test overrides", func(t *testing.T) {
|
|
// the only way to set the config in the agent is via the overrides
|
|
a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`})
|
|
defer a.Shutdown()
|
|
|
|
// Register request with peer
|
|
args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"}
|
|
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
|
|
|
|
obj, err := a.srv.CatalogRegister(nil, req)
|
|
require.NoError(t, err)
|
|
applied, ok := obj.(bool)
|
|
require.True(t, ok)
|
|
require.True(t, applied)
|
|
})
|
|
|
|
}
|
|
|
|
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
for _, addr := range []string{"0.0.0.0", "::", "[::]"} {
|
|
t.Run("addr "+addr, func(t *testing.T) {
|
|
args := &structs.RegisterRequest{
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "test",
|
|
Address: addr,
|
|
Port: 8080,
|
|
},
|
|
}
|
|
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
|
|
_, err := a.srv.CatalogRegister(nil, req)
|
|
if err == nil || err.Error() != "Invalid service address" {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCatalogDeregister(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
// Register node
|
|
args := &structs.DeregisterRequest{Node: "foo"}
|
|
req, _ := http.NewRequest("PUT", "/v1/catalog/deregister", jsonReader(args))
|
|
obj, err := a.srv.CatalogDeregister(nil, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
res := obj.(bool)
|
|
if res != true {
|
|
t.Fatalf("bad: %v", res)
|
|
}
|
|
}
|
|
|
|
func TestCatalogDatacenters(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/datacenters", nil)
|
|
obj, err := a.srv.CatalogDatacenters(nil, req)
|
|
if err != nil {
|
|
r.Fatal(err)
|
|
}
|
|
|
|
dcs := obj.([]string)
|
|
if got, want := len(dcs), 1; got != want {
|
|
r.Fatalf("got %d data centers want %d", got, want)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCatalogNodes(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register node
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Verify an index is set
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.Nodes)
|
|
if len(nodes) != 2 {
|
|
t.Fatalf("bad: %v ; nodes:=%v", obj, nodes)
|
|
}
|
|
}
|
|
|
|
func TestCatalogNodes_MetaFilter(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register a node with a meta field
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
NodeMeta: map[string]string{
|
|
"somekey": "somevalue",
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?node-meta=somekey:somevalue", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Verify an index is set
|
|
assertIndex(t, resp)
|
|
|
|
// Verify we only get the node with the correct meta field back
|
|
nodes := obj.(structs.Nodes)
|
|
if len(nodes) != 1 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" {
|
|
t.Fatalf("bad: %v", nodes[0].Meta)
|
|
}
|
|
}
|
|
|
|
func TestCatalogNodes_Filter(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register a node with a meta field
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
NodeMeta: map[string]string{
|
|
"somekey": "somevalue",
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?filter="+url.QueryEscape("Meta.somekey == somevalue"), nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodes(resp, req)
|
|
require.NoError(t, err)
|
|
|
|
// Verify an index is set
|
|
assertIndex(t, resp)
|
|
|
|
// Verify we only get the node with the correct meta field back
|
|
nodes := obj.(structs.Nodes)
|
|
require.Len(t, nodes, 1)
|
|
|
|
v, ok := nodes[0].Meta["somekey"]
|
|
require.True(t, ok)
|
|
require.Equal(t, v, "somevalue")
|
|
}
|
|
|
|
func TestCatalogNodes_WanTranslation(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a1 := NewTestAgent(t, `
|
|
datacenter = "dc1"
|
|
translate_wan_addrs = true
|
|
acl_datacenter = ""
|
|
`)
|
|
defer a1.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a1.RPC, "dc1")
|
|
|
|
a2 := NewTestAgent(t, `
|
|
datacenter = "dc2"
|
|
translate_wan_addrs = true
|
|
acl_datacenter = ""
|
|
`)
|
|
defer a2.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a2.RPC, "dc2")
|
|
|
|
// Wait for the WAN join.
|
|
addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
|
|
if _, err := a2.JoinWAN([]string{addr}); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
testrpc.WaitForLeader(t, a1.RPC, "dc1")
|
|
testrpc.WaitForLeader(t, a2.RPC, "dc2")
|
|
retry.Run(t, func(r *retry.R) {
|
|
if got, want := len(a1.WANMembers()), 2; got < want {
|
|
r.Fatalf("got %d WAN members want at least %d", got, want)
|
|
}
|
|
})
|
|
|
|
// Register a node with DC2.
|
|
{
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc2",
|
|
Node: "wan_translation_test",
|
|
Address: "127.0.0.1",
|
|
TaggedAddresses: map[string]string{
|
|
"wan": "127.0.0.2",
|
|
},
|
|
Service: &structs.NodeService{
|
|
Service: "http_wan_translation_test",
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a2.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Query nodes in DC2 from DC1.
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc2", nil)
|
|
resp1 := httptest.NewRecorder()
|
|
obj1, err1 := a1.srv.CatalogNodes(resp1, req)
|
|
if err1 != nil {
|
|
t.Fatalf("err: %v", err1)
|
|
}
|
|
assertIndex(t, resp1)
|
|
|
|
// Expect that DC1 gives us a WAN address (since the node is in DC2).
|
|
nodes1 := obj1.(structs.Nodes)
|
|
if len(nodes1) != 2 {
|
|
t.Fatalf("bad: %v, nodes:=%v", obj1, nodes1)
|
|
}
|
|
var address string
|
|
for _, node := range nodes1 {
|
|
if node.Node == "wan_translation_test" {
|
|
address = node.Address
|
|
}
|
|
}
|
|
if address != "127.0.0.2" {
|
|
t.Fatalf("bad: %s", address)
|
|
}
|
|
|
|
// Query DC2 from DC2.
|
|
resp2 := httptest.NewRecorder()
|
|
obj2, err2 := a2.srv.CatalogNodes(resp2, req)
|
|
if err2 != nil {
|
|
t.Fatalf("err: %v", err2)
|
|
}
|
|
assertIndex(t, resp2)
|
|
|
|
// Expect that DC2 gives us a private address (since the node is in DC2).
|
|
nodes2 := obj2.(structs.Nodes)
|
|
if len(nodes2) != 2 {
|
|
t.Fatalf("bad: %v", obj2)
|
|
}
|
|
for _, node := range nodes2 {
|
|
if node.Node == "wan_translation_test" {
|
|
address = node.Address
|
|
}
|
|
}
|
|
if address != "127.0.0.1" {
|
|
t.Fatalf("bad: %s", address)
|
|
}
|
|
}
|
|
|
|
func TestCatalogNodes_Blocking(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WaitForAntiEntropySync())
|
|
|
|
// Run the query
|
|
args := &structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
}
|
|
var out structs.IndexedNodes
|
|
if err := a.RPC(context.Background(), "Catalog.ListNodes", *args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Async cause a change
|
|
waitIndex := out.Index
|
|
start := time.Now()
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
}
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Errorf("err: %v", err)
|
|
}
|
|
}()
|
|
|
|
const waitDuration = 3 * time.Second
|
|
|
|
// Re-run the query, if errantly woken up with no change, resume blocking.
|
|
var elapsed time.Duration
|
|
RUN_BLOCKING_QUERY:
|
|
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/catalog/nodes?wait=%s&index=%d",
|
|
waitDuration.String(),
|
|
waitIndex), nil)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
elapsed = time.Since(start)
|
|
|
|
idx := getIndex(t, resp)
|
|
if idx < waitIndex {
|
|
t.Fatalf("bad: %v", idx)
|
|
} else if idx == waitIndex {
|
|
if elapsed > waitDuration {
|
|
// This should prevent the loop from running longer than the
|
|
// waitDuration
|
|
t.Fatalf("too slow: %v", elapsed)
|
|
}
|
|
goto RUN_BLOCKING_QUERY
|
|
}
|
|
|
|
// Should block at least 100ms before getting the changed results
|
|
if elapsed < 100*time.Millisecond {
|
|
t.Fatalf("too fast: %v", elapsed)
|
|
}
|
|
|
|
nodes := obj.(structs.Nodes)
|
|
if len(nodes) != 2 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
|
|
}
|
|
|
|
func TestCatalogNodes_DistanceSort(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register nodes.
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
}
|
|
var out struct{}
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
args = &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "bar",
|
|
Address: "127.0.0.2",
|
|
}
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
// Nobody has coordinates set so this will still return them in the
|
|
// order they are indexed.
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1&near=foo", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodes(resp, req)
|
|
require.NoError(t, err)
|
|
|
|
assertIndex(t, resp)
|
|
nodes := obj.(structs.Nodes)
|
|
require.Len(t, nodes, 3)
|
|
require.Equal(t, "bar", nodes[0].Node)
|
|
require.Equal(t, "foo", nodes[1].Node)
|
|
require.Equal(t, a.Config.NodeName, nodes[2].Node)
|
|
|
|
// Send an update for the node and wait for it to get applied.
|
|
arg := structs.CoordinateUpdateRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
|
|
}
|
|
require.NoError(t, a.RPC(context.Background(), "Coordinate.Update", &arg, &out))
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
// Query again and now foo should have moved to the front of the line.
|
|
req, _ = http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1&near=foo", nil)
|
|
resp = httptest.NewRecorder()
|
|
obj, err = a.srv.CatalogNodes(resp, req)
|
|
require.NoError(t, err)
|
|
|
|
assertIndex(t, resp)
|
|
nodes = obj.(structs.Nodes)
|
|
require.Len(t, nodes, 3)
|
|
require.Equal(t, "foo", nodes[0].Node)
|
|
require.Equal(t, "bar", nodes[1].Node)
|
|
require.Equal(t, a.Config.NodeName, nodes[2].Node)
|
|
}
|
|
|
|
func TestCatalogServices(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register node
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/services?dc=dc1", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServices(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
|
|
services := obj.(structs.Services)
|
|
if len(services) != 2 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
}
|
|
|
|
func TestCatalogServices_NodeMetaFilter(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
// Register node
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
NodeMeta: map[string]string{
|
|
"somekey": "somevalue",
|
|
},
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/services?node-meta=somekey:somevalue", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServices(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
|
|
services := obj.(structs.Services)
|
|
if len(services) != 1 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
if _, ok := services[args.Service.Service]; !ok {
|
|
t.Fatalf("bad: %v", services)
|
|
}
|
|
}
|
|
|
|
func TestCatalogRegister_checkRegistration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
// Register node with a service and check
|
|
check := structs.HealthCheck{
|
|
Node: "foo",
|
|
CheckID: "foo-check",
|
|
Name: "foo check",
|
|
ServiceID: "api",
|
|
Definition: structs.HealthCheckDefinition{
|
|
TCP: "localhost:8888",
|
|
Interval: 5 * time.Second,
|
|
},
|
|
}
|
|
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
},
|
|
Check: &check,
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
req, _ := http.NewRequest("GET", "/v1/health/checks/api", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.HealthServiceChecks(resp, req)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
|
|
checks := obj.(structs.HealthChecks)
|
|
if len(checks) != 1 {
|
|
r.Fatalf("expected 1 check, got: %d", len(checks))
|
|
}
|
|
if checks[0].CheckID != check.CheckID {
|
|
r.Fatalf("expected check id %s, got %s", check.Type, checks[0].Type)
|
|
}
|
|
if checks[0].Type != "tcp" {
|
|
r.Fatalf("expected check type tcp, got %s", checks[0].Type)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCatalogRegister_checkRegistration_UDP(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
// Register node with a service and check
|
|
check := structs.HealthCheck{
|
|
Node: "foo",
|
|
CheckID: "foo-check",
|
|
Name: "foo check",
|
|
ServiceID: "api",
|
|
Definition: structs.HealthCheckDefinition{
|
|
UDP: "localhost:8888",
|
|
Interval: 5 * time.Second,
|
|
},
|
|
}
|
|
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
},
|
|
Check: &check,
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
req, _ := http.NewRequest("GET", "/v1/health/checks/api", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.HealthServiceChecks(resp, req)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
|
|
checks := obj.(structs.HealthChecks)
|
|
if len(checks) != 1 {
|
|
r.Fatalf("expected 1 check, got: %d", len(checks))
|
|
}
|
|
if checks[0].CheckID != check.CheckID {
|
|
r.Fatalf("expected check id %s, got %s", check.Type, checks[0].Type)
|
|
}
|
|
if checks[0].Type != "udp" {
|
|
r.Fatalf("expected check type udp, got %s", checks[0].Type)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestCatalogServiceNodes(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
// Make sure an empty list is returned, not a nil
|
|
{
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?tag=a", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
if nodes == nil || len(nodes) != 0 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
}
|
|
|
|
// Register node
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
Tags: []string{"a"},
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?tag=a", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
if len(nodes) != 1 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
|
|
// Test caching
|
|
{
|
|
// List instances with cache enabled
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?cached", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
require.NoError(t, err)
|
|
nodes := obj.(structs.ServiceNodes)
|
|
assert.Len(t, nodes, 1)
|
|
|
|
// Should be a cache miss
|
|
assert.Equal(t, "MISS", resp.Header().Get("X-Cache"))
|
|
}
|
|
|
|
{
|
|
// List instances with cache enabled
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?cached", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
require.NoError(t, err)
|
|
nodes := obj.(structs.ServiceNodes)
|
|
assert.Len(t, nodes, 1)
|
|
|
|
// Should be a cache HIT now!
|
|
assert.Equal(t, "HIT", resp.Header().Get("X-Cache"))
|
|
assert.Equal(t, "0", resp.Header().Get("Age"))
|
|
}
|
|
|
|
// Ensure background refresh works
|
|
{
|
|
// Register a new instance of the service
|
|
args2 := args
|
|
args2.Node = "bar"
|
|
args2.Address = "127.0.0.2"
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
// List it again
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?cached", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
r.Check(err)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
if len(nodes) != 2 {
|
|
r.Fatalf("Want 2 nodes")
|
|
}
|
|
|
|
// Should be a cache hit! The data should've updated in the cache
|
|
// in the background so this should've been fetched directly from
|
|
// the cache.
|
|
if resp.Header().Get("X-Cache") != "HIT" {
|
|
r.Fatalf("should be a cache hit")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
// Make sure an empty list is returned, not a nil
|
|
{
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
if nodes == nil || len(nodes) != 0 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
}
|
|
|
|
// Register node
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
NodeMeta: map[string]string{
|
|
"somekey": "somevalue",
|
|
},
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
if len(nodes) != 1 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
}
|
|
|
|
func TestCatalogServiceNodes_Filter(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
queryPath := "/v1/catalog/service/api?filter=" + url.QueryEscape("ServiceMeta.somekey == somevalue")
|
|
|
|
// Make sure an empty list is returned, not a nil
|
|
{
|
|
req, _ := http.NewRequest("GET", queryPath, nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
require.NoError(t, err)
|
|
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
require.Empty(t, nodes)
|
|
}
|
|
|
|
// Register node
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
Meta: map[string]string{
|
|
"somekey": "somevalue",
|
|
},
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
// Register a second service for the node
|
|
args = &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: "api2",
|
|
Service: "api",
|
|
Meta: map[string]string{
|
|
"somekey": "notvalue",
|
|
},
|
|
},
|
|
SkipNodeUpdate: true,
|
|
}
|
|
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", queryPath, nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
require.NoError(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
require.Len(t, nodes, 1)
|
|
}
|
|
|
|
func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a1 := NewTestAgent(t, `
|
|
datacenter = "dc1"
|
|
translate_wan_addrs = true
|
|
acl_datacenter = ""
|
|
`)
|
|
defer a1.Shutdown()
|
|
|
|
a2 := NewTestAgent(t, `
|
|
datacenter = "dc2"
|
|
translate_wan_addrs = true
|
|
acl_datacenter = ""
|
|
`)
|
|
defer a2.Shutdown()
|
|
|
|
// Wait for the WAN join.
|
|
addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
|
|
_, err := a2.srv.agent.JoinWAN([]string{addr})
|
|
require.NoError(t, err)
|
|
retry.Run(t, func(r *retry.R) {
|
|
require.Len(r, a1.WANMembers(), 2)
|
|
})
|
|
|
|
// Register a node with DC2.
|
|
{
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc2",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
TaggedAddresses: map[string]string{
|
|
"wan": "127.0.0.2",
|
|
},
|
|
Service: &structs.NodeService{
|
|
Service: "http_wan_translation_test",
|
|
Address: "127.0.0.1",
|
|
Port: 8080,
|
|
TaggedAddresses: map[string]structs.ServiceAddress{
|
|
"wan": {
|
|
Address: "1.2.3.4",
|
|
Port: 80,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
require.NoError(t, a2.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
}
|
|
|
|
// Query for the node in DC2 from DC1.
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/http_wan_translation_test?dc=dc2", nil)
|
|
resp1 := httptest.NewRecorder()
|
|
obj1, err1 := a1.srv.CatalogServiceNodes(resp1, req)
|
|
require.NoError(t, err1)
|
|
require.NoError(t, checkIndex(resp1))
|
|
|
|
// Expect that DC1 gives us a WAN address (since the node is in DC2).
|
|
nodes1, ok := obj1.(structs.ServiceNodes)
|
|
require.True(t, ok, "obj1 is not a structs.ServiceNodes")
|
|
require.Len(t, nodes1, 1)
|
|
node1 := nodes1[0]
|
|
require.Equal(t, node1.Address, "127.0.0.2")
|
|
require.Equal(t, node1.ServiceAddress, "1.2.3.4")
|
|
require.Equal(t, node1.ServicePort, 80)
|
|
|
|
// Query DC2 from DC2.
|
|
resp2 := httptest.NewRecorder()
|
|
obj2, err2 := a2.srv.CatalogServiceNodes(resp2, req)
|
|
require.NoError(t, err2)
|
|
require.NoError(t, checkIndex(resp2))
|
|
|
|
// Expect that DC2 gives us a local address (since the node is in DC2).
|
|
nodes2, ok := obj2.(structs.ServiceNodes)
|
|
require.True(t, ok, "obj2 is not a structs.ServiceNodes")
|
|
require.Len(t, nodes2, 1)
|
|
node2 := nodes2[0]
|
|
require.Equal(t, node2.Address, "127.0.0.1")
|
|
require.Equal(t, node2.ServiceAddress, "127.0.0.1")
|
|
require.Equal(t, node2.ServicePort, 8080)
|
|
}
|
|
|
|
func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register nodes.
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "bar",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
Tags: []string{"a"},
|
|
},
|
|
}
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/service/api?tag=a", nil)
|
|
args = &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.2",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
Tags: []string{"a"},
|
|
},
|
|
}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Nobody has coordinates set so this will still return them in the
|
|
// order they are indexed.
|
|
req, _ = http.NewRequest("GET", "/v1/catalog/service/api?tag=a&near=foo", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
nodes := obj.(structs.ServiceNodes)
|
|
if len(nodes) != 2 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
if nodes[0].Node != "bar" {
|
|
t.Fatalf("bad: %v", nodes)
|
|
}
|
|
if nodes[1].Node != "foo" {
|
|
t.Fatalf("bad: %v", nodes)
|
|
}
|
|
|
|
// Send an update for the node and wait for it to get applied.
|
|
arg := structs.CoordinateUpdateRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
|
|
}
|
|
if err := a.RPC(context.Background(), "Coordinate.Update", &arg, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Eventually foo should move to the front of the line.
|
|
retry.Run(t, func(r *retry.R) {
|
|
req, _ = http.NewRequest("GET", "/v1/catalog/service/api?tag=a&near=foo", nil)
|
|
resp = httptest.NewRecorder()
|
|
obj, err = a.srv.CatalogServiceNodes(resp, req)
|
|
if err != nil {
|
|
r.Fatalf("err: %v", err)
|
|
}
|
|
|
|
assertIndex(t, resp)
|
|
nodes = obj.(structs.ServiceNodes)
|
|
if len(nodes) != 2 {
|
|
r.Fatalf("bad: %v", obj)
|
|
}
|
|
if nodes[0].Node != "foo" {
|
|
r.Fatalf("bad: %v", nodes)
|
|
}
|
|
if nodes[1].Node != "bar" {
|
|
r.Fatalf("bad: %v", nodes)
|
|
}
|
|
})
|
|
}
|
|
|
|
// Test that connect proxies can be queried via /v1/catalog/service/:service
|
|
// directly and that their results contain the proxy fields.
|
|
func TestCatalogServiceNodes_ConnectProxy(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register
|
|
args := structs.TestRegisterRequestProxy(t)
|
|
var out struct{}
|
|
assert.Nil(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
|
"/v1/catalog/service/%s", args.Service.Service), nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
assert.Nil(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
assert.Len(t, nodes, 1)
|
|
assert.Equal(t, structs.ServiceKindConnectProxy, nodes[0].ServiceKind)
|
|
assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
|
|
}
|
|
|
|
func registerService(t *testing.T, a *TestAgent) (registerServiceReq *structs.RegisterRequest) {
|
|
t.Helper()
|
|
entMeta := acl.DefaultEnterpriseMeta()
|
|
registerServiceReq = structs.TestRegisterRequestProxy(t)
|
|
registerServiceReq.EnterpriseMeta = *entMeta
|
|
registerServiceReq.Service.EnterpriseMeta = *entMeta
|
|
registerServiceReq.Service.Proxy.Upstreams = structs.TestAddDefaultsToUpstreams(t, registerServiceReq.Service.Proxy.Upstreams, *entMeta)
|
|
registerServiceReq.Check = &structs.HealthCheck{
|
|
Node: registerServiceReq.Node,
|
|
Name: "check1",
|
|
}
|
|
|
|
var out struct{}
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", registerServiceReq, &out))
|
|
|
|
return
|
|
}
|
|
|
|
func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs.ProxyConfigEntry) {
|
|
t.Helper()
|
|
// Register proxy-defaults
|
|
proxyGlobalEntry = structs.ProxyConfigEntry{
|
|
Kind: structs.ProxyDefaults,
|
|
Name: structs.ProxyConfigGlobal,
|
|
Mode: structs.ProxyModeDirect,
|
|
Config: map[string]interface{}{
|
|
"local_connect_timeout_ms": uint64(1000),
|
|
"handshake_timeout_ms": uint64(1000),
|
|
},
|
|
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
|
}
|
|
proxyDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
|
|
Op: structs.ConfigEntryUpsert,
|
|
Datacenter: "dc1",
|
|
Entry: &proxyGlobalEntry,
|
|
}
|
|
var proxyDefaultsConfigEntryResp bool
|
|
require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp))
|
|
return
|
|
}
|
|
|
|
func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (serviceDefaultsConfigEntry structs.ServiceConfigEntry) {
|
|
t.Helper()
|
|
limits := 512
|
|
serviceDefaultsConfigEntry = structs.ServiceConfigEntry{
|
|
Kind: structs.ServiceDefaults,
|
|
Name: serviceName,
|
|
Mode: structs.ProxyModeTransparent,
|
|
UpstreamConfig: &structs.UpstreamConfiguration{
|
|
Defaults: &structs.UpstreamConfig{
|
|
MeshGateway: structs.MeshGatewayConfig{
|
|
Mode: structs.MeshGatewayModeLocal,
|
|
},
|
|
Limits: &structs.UpstreamLimits{
|
|
MaxConnections: &limits,
|
|
},
|
|
},
|
|
},
|
|
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
|
}
|
|
serviceDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
|
|
Op: structs.ConfigEntryUpsert,
|
|
Datacenter: "dc1",
|
|
Entry: &serviceDefaultsConfigEntry,
|
|
}
|
|
var serviceDefaultsConfigEntryResp bool
|
|
require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp))
|
|
return
|
|
}
|
|
|
|
func validateMergeCentralConfigResponse(t *testing.T, v *structs.ServiceNode,
|
|
registerServiceReq *structs.RegisterRequest,
|
|
proxyGlobalEntry structs.ProxyConfigEntry,
|
|
serviceDefaultsConfigEntry structs.ServiceConfigEntry) {
|
|
|
|
t.Helper()
|
|
require.Equal(t, registerServiceReq.Service.Service, v.ServiceName)
|
|
// validate proxy global defaults are resolved in the merged service config
|
|
require.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config)
|
|
// validate service defaults override proxy-defaults/global
|
|
require.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode)
|
|
require.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode)
|
|
// validate service defaults are resolved in the merged service config
|
|
// expected number of upstreams = (number of upstreams defined in the register request proxy config +
|
|
// 1 centrally configured default from service defaults)
|
|
require.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams))
|
|
for _, up := range v.ServiceProxy.Upstreams {
|
|
if up.DestinationType != "" && up.DestinationType != structs.UpstreamDestTypeService {
|
|
continue
|
|
}
|
|
require.Contains(t, up.Config, "limits")
|
|
upstreamLimits := up.Config["limits"].(*structs.UpstreamLimits)
|
|
require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections)
|
|
require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode)
|
|
}
|
|
}
|
|
|
|
func TestListServiceNodes_MergeCentralConfig(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register the service
|
|
registerServiceReq := registerService(t, a)
|
|
// Register proxy-defaults
|
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
|
// Register service-defaults
|
|
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
|
|
|
type testCase struct {
|
|
testCaseName string
|
|
serviceName string
|
|
connect bool
|
|
}
|
|
|
|
run := func(t *testing.T, tc testCase) {
|
|
url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config", tc.serviceName)
|
|
if tc.connect {
|
|
url = fmt.Sprintf("/v1/catalog/connect/%s?merge-central-config", tc.serviceName)
|
|
}
|
|
req, _ := http.NewRequest("GET", url, nil)
|
|
resp := httptest.NewRecorder()
|
|
var obj interface{}
|
|
var err error
|
|
if tc.connect {
|
|
obj, err = a.srv.CatalogConnectServiceNodes(resp, req)
|
|
} else {
|
|
obj, err = a.srv.CatalogServiceNodes(resp, req)
|
|
}
|
|
|
|
require.NoError(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
serviceNodes := obj.(structs.ServiceNodes)
|
|
|
|
// validate response
|
|
require.Len(t, serviceNodes, 1)
|
|
v := serviceNodes[0]
|
|
|
|
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
|
}
|
|
testCases := []testCase{
|
|
{
|
|
testCaseName: "List service instances with merge-central-config",
|
|
serviceName: registerServiceReq.Service.Service,
|
|
},
|
|
{
|
|
testCaseName: "List connect capable service instances with merge-central-config",
|
|
serviceName: registerServiceReq.Service.Proxy.DestinationServiceName,
|
|
connect: true,
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
t.Run(tc.testCaseName, func(t *testing.T) {
|
|
run(t, tc)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCatalogServiceNodes_MergeCentralConfigBlocking(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register the service
|
|
registerServiceReq := registerService(t, a)
|
|
// Register proxy-defaults
|
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
|
|
|
// Run the query
|
|
rpcReq := structs.ServiceSpecificRequest{
|
|
Datacenter: "dc1",
|
|
ServiceName: registerServiceReq.Service.Service,
|
|
MergeCentralConfig: true,
|
|
}
|
|
var rpcResp structs.IndexedServiceNodes
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.ServiceNodes", &rpcReq, &rpcResp))
|
|
|
|
require.Len(t, rpcResp.ServiceNodes, 1)
|
|
serviceNode := rpcResp.ServiceNodes[0]
|
|
require.Equal(t, registerServiceReq.Service.Service, serviceNode.ServiceName)
|
|
// validate proxy global defaults are resolved in the merged service config
|
|
require.Equal(t, proxyGlobalEntry.Config, serviceNode.ServiceProxy.Config)
|
|
require.Equal(t, proxyGlobalEntry.Mode, serviceNode.ServiceProxy.Mode)
|
|
|
|
// Async cause a change - register service defaults
|
|
waitIndex := rpcResp.Index
|
|
start := time.Now()
|
|
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
// Register service-defaults
|
|
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
|
}()
|
|
|
|
const waitDuration = 3 * time.Second
|
|
RUN_BLOCKING_QUERY:
|
|
url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config&wait=%s&index=%d",
|
|
registerServiceReq.Service.Service, waitDuration.String(), waitIndex)
|
|
req, _ := http.NewRequest("GET", url, nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
|
|
|
require.NoError(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
elapsed := time.Since(start)
|
|
idx := getIndex(t, resp)
|
|
if idx < waitIndex {
|
|
t.Fatalf("bad index returned: %v", idx)
|
|
} else if idx == waitIndex {
|
|
if elapsed > waitDuration {
|
|
// This should prevent the loop from running longer than the waitDuration
|
|
t.Fatalf("too slow: %v", elapsed)
|
|
}
|
|
goto RUN_BLOCKING_QUERY
|
|
}
|
|
// Should block at least 100ms before getting the changed results
|
|
if elapsed < 100*time.Millisecond {
|
|
t.Fatalf("too fast: %v", elapsed)
|
|
}
|
|
|
|
serviceNodes := obj.(structs.ServiceNodes)
|
|
|
|
// validate response
|
|
require.Len(t, serviceNodes, 1)
|
|
v := serviceNodes[0]
|
|
|
|
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
|
}
|
|
|
|
// Test that the Connect-compatible endpoints can be queried for a
|
|
// service via /v1/catalog/connect/:service.
|
|
func TestCatalogConnectServiceNodes_good(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register
|
|
args := structs.TestRegisterRequestProxy(t)
|
|
args.Service.Address = "127.0.0.55"
|
|
var out struct{}
|
|
assert.Nil(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
|
"/v1/catalog/connect/%s", args.Service.Proxy.DestinationServiceName), nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogConnectServiceNodes(resp, req)
|
|
assert.Nil(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
assert.Len(t, nodes, 1)
|
|
assert.Equal(t, structs.ServiceKindConnectProxy, nodes[0].ServiceKind)
|
|
assert.Equal(t, args.Service.Address, nodes[0].ServiceAddress)
|
|
assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
|
|
}
|
|
|
|
func TestCatalogConnectServiceNodes_Filter(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register
|
|
args := structs.TestRegisterRequestProxy(t)
|
|
args.Service.Address = "127.0.0.55"
|
|
var out struct{}
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
args = structs.TestRegisterRequestProxy(t)
|
|
args.Service.Address = "127.0.0.55"
|
|
args.Service.Meta = map[string]string{
|
|
"version": "2",
|
|
}
|
|
args.Service.ID = "web-proxy2"
|
|
args.SkipNodeUpdate = true
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
|
"/v1/catalog/connect/%s?filter=%s",
|
|
args.Service.Proxy.DestinationServiceName,
|
|
url.QueryEscape("ServiceMeta.version == 2")), nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogConnectServiceNodes(resp, req)
|
|
require.NoError(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
nodes := obj.(structs.ServiceNodes)
|
|
require.Len(t, nodes, 1)
|
|
require.Equal(t, structs.ServiceKindConnectProxy, nodes[0].ServiceKind)
|
|
require.Equal(t, args.Service.Address, nodes[0].ServiceAddress)
|
|
require.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
|
|
}
|
|
|
|
func TestCatalogNodeServices(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register node with a regular service and connect proxy
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
Tags: []string{"a"},
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Register a connect proxy
|
|
args.Service = structs.TestNodeServiceProxy(t)
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc1", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodeServices(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
assertIndex(t, resp)
|
|
|
|
services := obj.(*structs.NodeServices)
|
|
if len(services.Services) != 2 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
|
|
// Proxy service should have it's config intact
|
|
require.Equal(t, args.Service.Proxy, services.Services["web-proxy"].Proxy)
|
|
}
|
|
|
|
func TestCatalogNodeServiceList(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register node with a regular service and connect proxy
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
Tags: []string{"a"},
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Register a connect proxy
|
|
args.Service = structs.TestNodeServiceProxy(t)
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/node-services/foo?dc=dc1", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodeServiceList(resp, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
assertIndex(t, resp)
|
|
|
|
services := obj.(*structs.NodeServiceList)
|
|
if len(services.Services) != 2 {
|
|
t.Fatalf("bad: %v", obj)
|
|
}
|
|
|
|
var proxySvc *structs.NodeService
|
|
for _, svc := range services.Services {
|
|
if svc.ID == "web-proxy" {
|
|
proxySvc = svc
|
|
}
|
|
}
|
|
require.NotNil(t, proxySvc, "Missing proxy service registration")
|
|
// Proxy service should have it's config intact
|
|
require.Equal(t, args.Service.Proxy, proxySvc.Proxy)
|
|
}
|
|
|
|
func TestCatalogNodeServiceList_MergeCentralConfig(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register the service
|
|
registerServiceReq := registerService(t, a)
|
|
// Register proxy-defaults
|
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
|
// Register service-defaults
|
|
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
|
|
|
url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config", registerServiceReq.Node)
|
|
req, _ := http.NewRequest("GET", url, nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodeServiceList(resp, req)
|
|
require.NoError(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
nodeServices := obj.(*structs.NodeServiceList)
|
|
// validate response
|
|
require.Len(t, nodeServices.Services, 1)
|
|
validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
|
}
|
|
|
|
func TestCatalogNodeServiceList_MergeCentralConfigBlocking(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
|
|
|
// Register the service
|
|
registerServiceReq := registerService(t, a)
|
|
// Register proxy-defaults
|
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
|
|
|
// Run the query
|
|
rpcReq := structs.NodeSpecificRequest{
|
|
Datacenter: "dc1",
|
|
Node: registerServiceReq.Node,
|
|
MergeCentralConfig: true,
|
|
}
|
|
var rpcResp structs.IndexedNodeServiceList
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.NodeServiceList", &rpcReq, &rpcResp))
|
|
require.Len(t, rpcResp.NodeServices.Services, 1)
|
|
nodeService := rpcResp.NodeServices.Services[0]
|
|
require.Equal(t, registerServiceReq.Service.Service, nodeService.Service)
|
|
// validate proxy global defaults are resolved in the merged service config
|
|
require.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config)
|
|
require.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode)
|
|
|
|
// Async cause a change - register service defaults
|
|
waitIndex := rpcResp.Index
|
|
start := time.Now()
|
|
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
// Register service-defaults
|
|
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
|
}()
|
|
|
|
const waitDuration = 3 * time.Second
|
|
RUN_BLOCKING_QUERY:
|
|
|
|
url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config&wait=%s&index=%d",
|
|
registerServiceReq.Node, waitDuration.String(), waitIndex)
|
|
req, _ := http.NewRequest("GET", url, nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodeServiceList(resp, req)
|
|
require.NoError(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
elapsed := time.Since(start)
|
|
idx := getIndex(t, resp)
|
|
if idx < waitIndex {
|
|
t.Fatalf("bad index returned: %v", idx)
|
|
} else if idx == waitIndex {
|
|
if elapsed > waitDuration {
|
|
// This should prevent the loop from running longer than the waitDuration
|
|
t.Fatalf("too slow: %v", elapsed)
|
|
}
|
|
goto RUN_BLOCKING_QUERY
|
|
}
|
|
// Should block at least 100ms before getting the changed results
|
|
if elapsed < 100*time.Millisecond {
|
|
t.Fatalf("too fast: %v", elapsed)
|
|
}
|
|
|
|
nodeServices := obj.(*structs.NodeServiceList)
|
|
// validate response
|
|
require.Len(t, nodeServices.Services, 1)
|
|
validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
|
}
|
|
|
|
func TestCatalogNodeServices_Filter(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register node with a regular service and connect proxy
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: "api",
|
|
Tags: []string{"a"},
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
// Register a connect proxy
|
|
args.Service = structs.TestNodeServiceProxy(t)
|
|
require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc1&filter="+url.QueryEscape("Kind == `connect-proxy`"), nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodeServices(resp, req)
|
|
require.NoError(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
services := obj.(*structs.NodeServices)
|
|
require.Len(t, services.Services, 1)
|
|
|
|
// Proxy service should have it's config intact
|
|
require.Equal(t, args.Service.Proxy, services.Services["web-proxy"].Proxy)
|
|
}
|
|
|
|
// Test that the services on a node contain all the Connect proxies on
|
|
// the node as well with their fields properly populated.
|
|
func TestCatalogNodeServices_ConnectProxy(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register
|
|
args := structs.TestRegisterRequestProxy(t)
|
|
var out struct{}
|
|
assert.Nil(t, a.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
|
|
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
|
"/v1/catalog/node/%s", args.Node), nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogNodeServices(resp, req)
|
|
assert.Nil(t, err)
|
|
assertIndex(t, resp)
|
|
|
|
ns := obj.(*structs.NodeServices)
|
|
assert.Len(t, ns.Services, 1)
|
|
v := ns.Services[args.Service.Service]
|
|
assert.Equal(t, structs.ServiceKindConnectProxy, v.Kind)
|
|
}
|
|
|
|
func TestCatalogNodeServices_WanTranslation(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a1 := NewTestAgent(t, `
|
|
datacenter = "dc1"
|
|
translate_wan_addrs = true
|
|
acl_datacenter = ""
|
|
`)
|
|
defer a1.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a1.RPC, "dc1")
|
|
|
|
a2 := NewTestAgent(t, `
|
|
datacenter = "dc2"
|
|
translate_wan_addrs = true
|
|
acl_datacenter = ""
|
|
`)
|
|
defer a2.Shutdown()
|
|
testrpc.WaitForTestAgent(t, a2.RPC, "dc2")
|
|
|
|
// Wait for the WAN join.
|
|
addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
|
|
_, err := a2.srv.agent.JoinWAN([]string{addr})
|
|
require.NoError(t, err)
|
|
retry.Run(t, func(r *retry.R) {
|
|
require.Len(r, a1.WANMembers(), 2)
|
|
})
|
|
|
|
// Register a node with DC2.
|
|
{
|
|
args := &structs.RegisterRequest{
|
|
Datacenter: "dc2",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
TaggedAddresses: map[string]string{
|
|
"wan": "127.0.0.2",
|
|
},
|
|
Service: &structs.NodeService{
|
|
Service: "http_wan_translation_test",
|
|
Address: "127.0.0.1",
|
|
Port: 8080,
|
|
TaggedAddresses: map[string]structs.ServiceAddress{
|
|
"wan": {
|
|
Address: "1.2.3.4",
|
|
Port: 80,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
var out struct{}
|
|
require.NoError(t, a2.RPC(context.Background(), "Catalog.Register", args, &out))
|
|
}
|
|
|
|
// Query for the node in DC2 from DC1.
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc2", nil)
|
|
resp1 := httptest.NewRecorder()
|
|
obj1, err1 := a1.srv.CatalogNodeServices(resp1, req)
|
|
require.NoError(t, err1)
|
|
require.NoError(t, checkIndex(resp1))
|
|
|
|
// Expect that DC1 gives us a WAN address (since the node is in DC2).
|
|
service1, ok := obj1.(*structs.NodeServices)
|
|
require.True(t, ok, "obj1 is not a *structs.NodeServices")
|
|
require.NotNil(t, service1.Node)
|
|
require.Equal(t, service1.Node.Address, "127.0.0.2")
|
|
require.Len(t, service1.Services, 1)
|
|
ns1, ok := service1.Services["http_wan_translation_test"]
|
|
require.True(t, ok, "Missing service http_wan_translation_test")
|
|
require.Equal(t, "1.2.3.4", ns1.Address)
|
|
require.Equal(t, 80, ns1.Port)
|
|
|
|
// Query DC2 from DC2.
|
|
resp2 := httptest.NewRecorder()
|
|
obj2, err2 := a2.srv.CatalogNodeServices(resp2, req)
|
|
require.NoError(t, err2)
|
|
require.NoError(t, checkIndex(resp2))
|
|
|
|
// Expect that DC2 gives us a private address (since the node is in DC2).
|
|
service2 := obj2.(*structs.NodeServices)
|
|
require.True(t, ok, "obj2 is not a *structs.NodeServices")
|
|
require.NotNil(t, service2.Node)
|
|
require.Equal(t, service2.Node.Address, "127.0.0.1")
|
|
require.Len(t, service2.Services, 1)
|
|
ns2, ok := service2.Services["http_wan_translation_test"]
|
|
require.True(t, ok, "Missing service http_wan_translation_test")
|
|
require.Equal(t, ns2.Address, "127.0.0.1")
|
|
require.Equal(t, ns2.Port, 8080)
|
|
}
|
|
|
|
func TestCatalog_GatewayServices_Terminating(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Register a service to be covered by the wildcard in the config entry
|
|
args := structs.TestRegisterRequest(t)
|
|
args.Service.Service = "redis"
|
|
args.Check = &structs.HealthCheck{
|
|
Name: "redis",
|
|
Status: api.HealthPassing,
|
|
ServiceID: args.Service.Service,
|
|
}
|
|
var out struct{}
|
|
assert.NoError(t, a.RPC(context.Background(), "Catalog.Register", &args, &out))
|
|
|
|
// Associate the gateway and api/redis services
|
|
entryArgs := &structs.ConfigEntryRequest{
|
|
Op: structs.ConfigEntryUpsert,
|
|
Datacenter: "dc1",
|
|
Entry: &structs.TerminatingGatewayConfigEntry{
|
|
Kind: "terminating-gateway",
|
|
Name: "terminating",
|
|
Services: []structs.LinkedService{
|
|
{
|
|
Name: "api",
|
|
CAFile: "api/ca.crt",
|
|
CertFile: "api/client.crt",
|
|
KeyFile: "api/client.key",
|
|
SNI: "my-domain",
|
|
},
|
|
{
|
|
Name: "*",
|
|
CAFile: "ca.crt",
|
|
CertFile: "client.crt",
|
|
KeyFile: "client.key",
|
|
SNI: "my-alt-domain",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
var entryResp bool
|
|
assert.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &entryArgs, &entryResp))
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/gateway-services/terminating", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogGatewayServices(resp, req)
|
|
assert.NoError(r, err)
|
|
|
|
header := resp.Header().Get("X-Consul-Index")
|
|
if header == "" || header == "0" {
|
|
r.Fatalf("Bad: %v", header)
|
|
}
|
|
|
|
gatewayServices := obj.(structs.GatewayServices)
|
|
|
|
expect := structs.GatewayServices{
|
|
{
|
|
Service: structs.NewServiceName("api", nil),
|
|
Gateway: structs.NewServiceName("terminating", nil),
|
|
GatewayKind: structs.ServiceKindTerminatingGateway,
|
|
CAFile: "api/ca.crt",
|
|
CertFile: "api/client.crt",
|
|
KeyFile: "api/client.key",
|
|
SNI: "my-domain",
|
|
},
|
|
{
|
|
Service: structs.NewServiceName("redis", nil),
|
|
Gateway: structs.NewServiceName("terminating", nil),
|
|
GatewayKind: structs.ServiceKindTerminatingGateway,
|
|
CAFile: "ca.crt",
|
|
CertFile: "client.crt",
|
|
KeyFile: "client.key",
|
|
SNI: "my-alt-domain",
|
|
FromWildcard: true,
|
|
},
|
|
}
|
|
|
|
// Ignore raft index for equality
|
|
for _, s := range gatewayServices {
|
|
s.RaftIndex = structs.RaftIndex{}
|
|
}
|
|
assert.Equal(r, expect, gatewayServices)
|
|
})
|
|
}
|
|
|
|
func TestCatalog_GatewayServices_Ingress(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
// Associate an ingress gateway with api/redis
|
|
entryArgs := &structs.ConfigEntryRequest{
|
|
Op: structs.ConfigEntryUpsert,
|
|
Datacenter: "dc1",
|
|
Entry: &structs.IngressGatewayConfigEntry{
|
|
Kind: "ingress-gateway",
|
|
Name: "ingress",
|
|
Listeners: []structs.IngressListener{
|
|
{
|
|
Port: 8888,
|
|
Services: []structs.IngressService{
|
|
{
|
|
Name: "api",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Port: 9999,
|
|
Services: []structs.IngressService{
|
|
{
|
|
Name: "redis",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
var entryResp bool
|
|
require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &entryArgs, &entryResp))
|
|
|
|
retry.Run(t, func(r *retry.R) {
|
|
req, _ := http.NewRequest("GET", "/v1/catalog/gateway-services/ingress", nil)
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.CatalogGatewayServices(resp, req)
|
|
require.NoError(r, err)
|
|
|
|
header := resp.Header().Get("X-Consul-Index")
|
|
if header == "" || header == "0" {
|
|
r.Fatalf("Bad: %v", header)
|
|
}
|
|
|
|
gatewayServices := obj.(structs.GatewayServices)
|
|
|
|
expect := structs.GatewayServices{
|
|
{
|
|
Service: structs.NewServiceName("api", nil),
|
|
Gateway: structs.NewServiceName("ingress", nil),
|
|
GatewayKind: structs.ServiceKindIngressGateway,
|
|
Protocol: "tcp",
|
|
Port: 8888,
|
|
},
|
|
{
|
|
Service: structs.NewServiceName("redis", nil),
|
|
Gateway: structs.NewServiceName("ingress", nil),
|
|
GatewayKind: structs.ServiceKindIngressGateway,
|
|
Protocol: "tcp",
|
|
Port: 9999,
|
|
},
|
|
}
|
|
|
|
// Ignore raft index for equality
|
|
for _, s := range gatewayServices {
|
|
s.RaftIndex = structs.RaftIndex{}
|
|
}
|
|
require.Equal(r, expect, gatewayServices)
|
|
})
|
|
}
|
|
|
|
func TestCatalogRegister_AssignManualServiceVIPs(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("too slow for testing.Short")
|
|
}
|
|
|
|
t.Parallel()
|
|
a := NewTestAgent(t, "")
|
|
defer a.Shutdown()
|
|
|
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
|
|
for _, service := range []string{"api", "web"} {
|
|
req := structs.ConfigEntryRequest{
|
|
Datacenter: "dc1",
|
|
Entry: &structs.ServiceResolverConfigEntry{
|
|
Kind: structs.ServiceResolver,
|
|
Name: service,
|
|
},
|
|
}
|
|
var out bool
|
|
require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &req, &out))
|
|
}
|
|
|
|
assignVIPs := func(req structs.AssignServiceManualVIPsRequest, expect structs.AssignServiceManualVIPsResponse) {
|
|
httpReq, _ := http.NewRequest("PUT", "/v1/internal/service-virtual-ip", jsonReader(req))
|
|
resp := httptest.NewRecorder()
|
|
obj, err := a.srv.AssignManualServiceVIPs(resp, httpReq)
|
|
require.NoError(t, err)
|
|
|
|
result, ok := obj.(structs.AssignServiceManualVIPsResponse)
|
|
require.True(t, ok)
|
|
require.Equal(t, expect, result)
|
|
}
|
|
|
|
// Assign some manual IPs to the service
|
|
assignVIPs(structs.AssignServiceManualVIPsRequest{
|
|
Service: "api",
|
|
ManualVIPs: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"},
|
|
}, structs.AssignServiceManualVIPsResponse{
|
|
Found: true,
|
|
})
|
|
|
|
// Assign some manual IPs to the new service, reassigning one from the existing service.
|
|
assignVIPs(structs.AssignServiceManualVIPsRequest{
|
|
Service: "web",
|
|
ManualVIPs: []string{"2.2.2.2", "4.4.4.4"},
|
|
}, structs.AssignServiceManualVIPsResponse{
|
|
Found: true,
|
|
UnassignedFrom: []structs.PeeredServiceName{
|
|
{
|
|
ServiceName: structs.ServiceName{Name: "api", EnterpriseMeta: *acl.DefaultEnterpriseMeta()},
|
|
},
|
|
},
|
|
})
|
|
|
|
// Assign some manual IPs a non-existent service, should be a no-op.
|
|
assignVIPs(structs.AssignServiceManualVIPsRequest{
|
|
Service: "nope",
|
|
ManualVIPs: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4"},
|
|
}, structs.AssignServiceManualVIPsResponse{
|
|
Found: false,
|
|
})
|
|
}
|