// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: BUSL-1.1 package agent import ( "context" "fmt" "io" "net/http" "net/http/httptest" "net/url" "strings" "testing" "time" "github.com/hashicorp/serf/coordinate" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" ) func TestCatalogEndpointsFailInV2(t *testing.T) { t.Parallel() a := NewTestAgent(t, `experiments = ["resource-apis"]`) checkRequest := func(method, url string) { t.Run(method+" "+url, func(t *testing.T) { assertV1CatalogEndpointDoesNotWorkWithV2(t, a, method, url, "{}") }) } checkRequest("PUT", "/v1/catalog/register") checkRequest("GET", "/v1/catalog/connect/") checkRequest("PUT", "/v1/catalog/deregister") checkRequest("GET", "/v1/catalog/datacenters") checkRequest("GET", "/v1/catalog/nodes") checkRequest("GET", "/v1/catalog/services") checkRequest("GET", "/v1/catalog/service/") checkRequest("GET", "/v1/catalog/node/") checkRequest("GET", "/v1/catalog/node-services/") checkRequest("GET", "/v1/catalog/gateway-services/") } func assertV1CatalogEndpointDoesNotWorkWithV2(t *testing.T, a *TestAgent, method, url string, requestBody string) { var body io.Reader switch method { case http.MethodPost, http.MethodPut: body = strings.NewReader(requestBody + "\n") } req, err := http.NewRequest(method, url, body) require.NoError(t, err) resp := httptest.NewRecorder() a.srv.h.ServeHTTP(resp, req) require.Equal(t, http.StatusBadRequest, resp.Code) got, err := io.ReadAll(resp.Body) require.NoError(t, err) require.Contains(t, string(got), structs.ErrUsingV2CatalogExperiment.Error()) } func TestCatalogRegister_PeeringRegistration(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() t.Run("deny peer registrations by default", func(t *testing.T) { a := NewTestAgent(t, "") defer a.Shutdown() // Register request with peer args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"} req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args)) obj, err := a.srv.CatalogRegister(nil, req) require.Error(t, err) require.Contains(t, err.Error(), "cannot register requests with PeerName in them") require.Nil(t, obj) }) t.Run("cannot hcl set the peer registrations config", func(t *testing.T) { // this will have no effect, as the value is overriden in non user source a := NewTestAgent(t, "peering = { test_allow_peer_registrations = true }") defer a.Shutdown() // Register request with peer args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"} req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args)) obj, err := a.srv.CatalogRegister(nil, req) require.Error(t, err) require.Contains(t, err.Error(), "cannot register requests with PeerName in them") require.Nil(t, obj) }) t.Run("allow peer registrations with test overrides", func(t *testing.T) { // the only way to set the config in the agent is via the overrides a := StartTestAgent(t, TestAgent{HCL: ``, Overrides: `peering = { test_allow_peer_registrations = true }`}) defer a.Shutdown() // Register request with peer args := &structs.RegisterRequest{Node: "foo", PeerName: "foo", Address: "127.0.0.1"} req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args)) obj, err := a.srv.CatalogRegister(nil, req) require.NoError(t, err) applied, ok := obj.(bool) require.True(t, ok) require.True(t, applied) }) } func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() for _, addr := range []string{"0.0.0.0", "::", "[::]"} { t.Run("addr "+addr, func(t *testing.T) { args := &structs.RegisterRequest{ Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "test", Address: addr, Port: 8080, }, } req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args)) _, err := a.srv.CatalogRegister(nil, req) if err == nil || err.Error() != "Invalid service address" { t.Fatalf("err: %v", err) } }) } } func TestCatalogDeregister(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() // Register node args := &structs.DeregisterRequest{Node: "foo"} req, _ := http.NewRequest("PUT", "/v1/catalog/deregister", jsonReader(args)) obj, err := a.srv.CatalogDeregister(nil, req) if err != nil { t.Fatalf("err: %v", err) } res := obj.(bool) if res != true { t.Fatalf("bad: %v", res) } } func TestCatalogDatacenters(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() retry.Run(t, func(r *retry.R) { req, _ := http.NewRequest("GET", "/v1/catalog/datacenters", nil) obj, err := a.srv.CatalogDatacenters(nil, req) if err != nil { r.Fatal(err) } dcs := obj.([]string) if got, want := len(dcs), 1; got != want { r.Fatalf("got %d data centers want %d", got, want) } }) } func TestCatalogNodes(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } req, _ := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } // Verify an index is set assertIndex(t, resp) nodes := obj.(structs.Nodes) if len(nodes) != 2 { t.Fatalf("bad: %v ; nodes:=%v", obj, nodes) } } func TestCatalogNodes_MetaFilter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register a node with a meta field args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", NodeMeta: map[string]string{ "somekey": "somevalue", }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } req, _ := http.NewRequest("GET", "/v1/catalog/nodes?node-meta=somekey:somevalue", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } // Verify an index is set assertIndex(t, resp) // Verify we only get the node with the correct meta field back nodes := obj.(structs.Nodes) if len(nodes) != 1 { t.Fatalf("bad: %v", obj) } if v, ok := nodes[0].Meta["somekey"]; !ok || v != "somevalue" { t.Fatalf("bad: %v", nodes[0].Meta) } } func TestCatalogNodes_Filter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register a node with a meta field args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", NodeMeta: map[string]string{ "somekey": "somevalue", }, } var out struct{} require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", "/v1/catalog/nodes?filter="+url.QueryEscape("Meta.somekey == somevalue"), nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodes(resp, req) require.NoError(t, err) // Verify an index is set assertIndex(t, resp) // Verify we only get the node with the correct meta field back nodes := obj.(structs.Nodes) require.Len(t, nodes, 1) v, ok := nodes[0].Meta["somekey"] require.True(t, ok) require.Equal(t, v, "somevalue") } func TestCatalogNodes_WanTranslation(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a1 := NewTestAgent(t, ` datacenter = "dc1" translate_wan_addrs = true acl_datacenter = "" `) defer a1.Shutdown() testrpc.WaitForTestAgent(t, a1.RPC, "dc1") a2 := NewTestAgent(t, ` datacenter = "dc2" translate_wan_addrs = true acl_datacenter = "" `) defer a2.Shutdown() testrpc.WaitForTestAgent(t, a2.RPC, "dc2") // Wait for the WAN join. addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) if _, err := a2.JoinWAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } testrpc.WaitForLeader(t, a1.RPC, "dc1") testrpc.WaitForLeader(t, a2.RPC, "dc2") retry.Run(t, func(r *retry.R) { if got, want := len(a1.WANMembers()), 2; got < want { r.Fatalf("got %d WAN members want at least %d", got, want) } }) // Register a node with DC2. { args := &structs.RegisterRequest{ Datacenter: "dc2", Node: "wan_translation_test", Address: "127.0.0.1", TaggedAddresses: map[string]string{ "wan": "127.0.0.2", }, Service: &structs.NodeService{ Service: "http_wan_translation_test", }, } var out struct{} if err := a2.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } } // Query nodes in DC2 from DC1. req, _ := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc2", nil) resp1 := httptest.NewRecorder() obj1, err1 := a1.srv.CatalogNodes(resp1, req) if err1 != nil { t.Fatalf("err: %v", err1) } assertIndex(t, resp1) // Expect that DC1 gives us a WAN address (since the node is in DC2). nodes1 := obj1.(structs.Nodes) if len(nodes1) != 2 { t.Fatalf("bad: %v, nodes:=%v", obj1, nodes1) } var address string for _, node := range nodes1 { if node.Node == "wan_translation_test" { address = node.Address } } if address != "127.0.0.2" { t.Fatalf("bad: %s", address) } // Query DC2 from DC2. resp2 := httptest.NewRecorder() obj2, err2 := a2.srv.CatalogNodes(resp2, req) if err2 != nil { t.Fatalf("err: %v", err2) } assertIndex(t, resp2) // Expect that DC2 gives us a private address (since the node is in DC2). nodes2 := obj2.(structs.Nodes) if len(nodes2) != 2 { t.Fatalf("bad: %v", obj2) } for _, node := range nodes2 { if node.Node == "wan_translation_test" { address = node.Address } } if address != "127.0.0.1" { t.Fatalf("bad: %s", address) } } func TestCatalogNodes_Blocking(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WaitForAntiEntropySync()) // Run the query args := &structs.DCSpecificRequest{ Datacenter: "dc1", } var out structs.IndexedNodes if err := a.RPC(context.Background(), "Catalog.ListNodes", *args, &out); err != nil { t.Fatalf("err: %v", err) } // Async cause a change waitIndex := out.Index start := time.Now() go func() { time.Sleep(100 * time.Millisecond) args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Errorf("err: %v", err) } }() const waitDuration = 3 * time.Second // Re-run the query, if errantly woken up with no change, resume blocking. var elapsed time.Duration RUN_BLOCKING_QUERY: req, err := http.NewRequest("GET", fmt.Sprintf("/v1/catalog/nodes?wait=%s&index=%d", waitDuration.String(), waitIndex), nil) if err != nil { t.Fatalf("err: %v", err) } resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } elapsed = time.Since(start) idx := getIndex(t, resp) if idx < waitIndex { t.Fatalf("bad: %v", idx) } else if idx == waitIndex { if elapsed > waitDuration { // This should prevent the loop from running longer than the // waitDuration t.Fatalf("too slow: %v", elapsed) } goto RUN_BLOCKING_QUERY } // Should block at least 100ms before getting the changed results if elapsed < 100*time.Millisecond { t.Fatalf("too fast: %v", elapsed) } nodes := obj.(structs.Nodes) if len(nodes) != 2 { t.Fatalf("bad: %v", obj) } } func TestCatalogNodes_DistanceSort(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register nodes. args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", } var out struct{} require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) args = &structs.RegisterRequest{ Datacenter: "dc1", Node: "bar", Address: "127.0.0.2", } require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) // Nobody has coordinates set so this will still return them in the // order they are indexed. req, _ := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1&near=foo", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodes(resp, req) require.NoError(t, err) assertIndex(t, resp) nodes := obj.(structs.Nodes) require.Len(t, nodes, 3) require.Equal(t, "bar", nodes[0].Node) require.Equal(t, "foo", nodes[1].Node) require.Equal(t, a.Config.NodeName, nodes[2].Node) // Send an update for the node and wait for it to get applied. arg := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "foo", Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), } require.NoError(t, a.RPC(context.Background(), "Coordinate.Update", &arg, &out)) time.Sleep(300 * time.Millisecond) // Query again and now foo should have moved to the front of the line. req, _ = http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1&near=foo", nil) resp = httptest.NewRecorder() obj, err = a.srv.CatalogNodes(resp, req) require.NoError(t, err) assertIndex(t, resp) nodes = obj.(structs.Nodes) require.Len(t, nodes, 3) require.Equal(t, "foo", nodes[0].Node) require.Equal(t, "bar", nodes[1].Node) require.Equal(t, a.Config.NodeName, nodes[2].Node) } func TestCatalogServices(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register node args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } req, _ := http.NewRequest("GET", "/v1/catalog/services?dc=dc1", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServices(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) services := obj.(structs.Services) if len(services) != 2 { t.Fatalf("bad: %v", obj) } } func TestCatalogServices_NodeMetaFilter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() // Register node args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", NodeMeta: map[string]string{ "somekey": "somevalue", }, Service: &structs.NodeService{ Service: "api", }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } req, _ := http.NewRequest("GET", "/v1/catalog/services?node-meta=somekey:somevalue", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServices(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) services := obj.(structs.Services) if len(services) != 1 { t.Fatalf("bad: %v", obj) } if _, ok := services[args.Service.Service]; !ok { t.Fatalf("bad: %v", services) } } func TestCatalogRegister_checkRegistration(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() // Register node with a service and check check := structs.HealthCheck{ Node: "foo", CheckID: "foo-check", Name: "foo check", ServiceID: "api", Definition: structs.HealthCheckDefinition{ TCP: "localhost:8888", Interval: 5 * time.Second, }, } args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", }, Check: &check, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { req, _ := http.NewRequest("GET", "/v1/health/checks/api", nil) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceChecks(resp, req) if err != nil { r.Fatalf("err: %v", err) } checks := obj.(structs.HealthChecks) if len(checks) != 1 { r.Fatalf("expected 1 check, got: %d", len(checks)) } if checks[0].CheckID != check.CheckID { r.Fatalf("expected check id %s, got %s", check.Type, checks[0].Type) } if checks[0].Type != "tcp" { r.Fatalf("expected check type tcp, got %s", checks[0].Type) } }) } func TestCatalogRegister_checkRegistration_UDP(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() // Register node with a service and check check := structs.HealthCheck{ Node: "foo", CheckID: "foo-check", Name: "foo check", ServiceID: "api", Definition: structs.HealthCheckDefinition{ UDP: "localhost:8888", Interval: 5 * time.Second, }, } args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", }, Check: &check, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { req, _ := http.NewRequest("GET", "/v1/health/checks/api", nil) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceChecks(resp, req) if err != nil { r.Fatalf("err: %v", err) } checks := obj.(structs.HealthChecks) if len(checks) != 1 { r.Fatalf("expected 1 check, got: %d", len(checks)) } if checks[0].CheckID != check.CheckID { r.Fatalf("expected check id %s, got %s", check.Type, checks[0].Type) } if checks[0].Type != "udp" { r.Fatalf("expected check type udp, got %s", checks[0].Type) } }) } func TestCatalogServiceNodes(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() // Make sure an empty list is returned, not a nil { req, _ := http.NewRequest("GET", "/v1/catalog/service/api?tag=a", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) if nodes == nil || len(nodes) != 0 { t.Fatalf("bad: %v", obj) } } // Register node args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", Tags: []string{"a"}, }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } req, _ := http.NewRequest("GET", "/v1/catalog/service/api?tag=a", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) if len(nodes) != 1 { t.Fatalf("bad: %v", obj) } // Test caching { // List instances with cache enabled req, _ := http.NewRequest("GET", "/v1/catalog/service/api?cached", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) require.NoError(t, err) nodes := obj.(structs.ServiceNodes) assert.Len(t, nodes, 1) // Should be a cache miss assert.Equal(t, "MISS", resp.Header().Get("X-Cache")) } { // List instances with cache enabled req, _ := http.NewRequest("GET", "/v1/catalog/service/api?cached", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) require.NoError(t, err) nodes := obj.(structs.ServiceNodes) assert.Len(t, nodes, 1) // Should be a cache HIT now! assert.Equal(t, "HIT", resp.Header().Get("X-Cache")) assert.Equal(t, "0", resp.Header().Get("Age")) } // Ensure background refresh works { // Register a new instance of the service args2 := args args2.Node = "bar" args2.Address = "127.0.0.2" require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) retry.Run(t, func(r *retry.R) { // List it again req, _ := http.NewRequest("GET", "/v1/catalog/service/api?cached", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) r.Check(err) nodes := obj.(structs.ServiceNodes) if len(nodes) != 2 { r.Fatalf("Want 2 nodes") } // Should be a cache hit! The data should've updated in the cache // in the background so this should've been fetched directly from // the cache. if resp.Header().Get("X-Cache") != "HIT" { r.Fatalf("should be a cache hit") } }) } } func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() // Make sure an empty list is returned, not a nil { req, _ := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) if nodes == nil || len(nodes) != 0 { t.Fatalf("bad: %v", obj) } } // Register node args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", NodeMeta: map[string]string{ "somekey": "somevalue", }, Service: &structs.NodeService{ Service: "api", }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } req, _ := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) if len(nodes) != 1 { t.Fatalf("bad: %v", obj) } } func TestCatalogServiceNodes_Filter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() queryPath := "/v1/catalog/service/api?filter=" + url.QueryEscape("ServiceMeta.somekey == somevalue") // Make sure an empty list is returned, not a nil { req, _ := http.NewRequest("GET", queryPath, nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) require.NoError(t, err) assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) require.Empty(t, nodes) } // Register node args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", Meta: map[string]string{ "somekey": "somevalue", }, }, } var out struct{} require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) // Register a second service for the node args = &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ ID: "api2", Service: "api", Meta: map[string]string{ "somekey": "notvalue", }, }, SkipNodeUpdate: true, } require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", queryPath, nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) require.NoError(t, err) assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) require.Len(t, nodes, 1) } func TestCatalogServiceNodes_WanTranslation(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a1 := NewTestAgent(t, ` datacenter = "dc1" translate_wan_addrs = true acl_datacenter = "" `) defer a1.Shutdown() a2 := NewTestAgent(t, ` datacenter = "dc2" translate_wan_addrs = true acl_datacenter = "" `) defer a2.Shutdown() // Wait for the WAN join. addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) _, err := a2.srv.agent.JoinWAN([]string{addr}) require.NoError(t, err) retry.Run(t, func(r *retry.R) { require.Len(r, a1.WANMembers(), 2) }) // Register a node with DC2. { args := &structs.RegisterRequest{ Datacenter: "dc2", Node: "foo", Address: "127.0.0.1", TaggedAddresses: map[string]string{ "wan": "127.0.0.2", }, Service: &structs.NodeService{ Service: "http_wan_translation_test", Address: "127.0.0.1", Port: 8080, TaggedAddresses: map[string]structs.ServiceAddress{ "wan": { Address: "1.2.3.4", Port: 80, }, }, }, } var out struct{} require.NoError(t, a2.RPC(context.Background(), "Catalog.Register", args, &out)) } // Query for the node in DC2 from DC1. req, _ := http.NewRequest("GET", "/v1/catalog/service/http_wan_translation_test?dc=dc2", nil) resp1 := httptest.NewRecorder() obj1, err1 := a1.srv.CatalogServiceNodes(resp1, req) require.NoError(t, err1) require.NoError(t, checkIndex(resp1)) // Expect that DC1 gives us a WAN address (since the node is in DC2). nodes1, ok := obj1.(structs.ServiceNodes) require.True(t, ok, "obj1 is not a structs.ServiceNodes") require.Len(t, nodes1, 1) node1 := nodes1[0] require.Equal(t, node1.Address, "127.0.0.2") require.Equal(t, node1.ServiceAddress, "1.2.3.4") require.Equal(t, node1.ServicePort, 80) // Query DC2 from DC2. resp2 := httptest.NewRecorder() obj2, err2 := a2.srv.CatalogServiceNodes(resp2, req) require.NoError(t, err2) require.NoError(t, checkIndex(resp2)) // Expect that DC2 gives us a local address (since the node is in DC2). nodes2, ok := obj2.(structs.ServiceNodes) require.True(t, ok, "obj2 is not a structs.ServiceNodes") require.Len(t, nodes2, 1) node2 := nodes2[0] require.Equal(t, node2.Address, "127.0.0.1") require.Equal(t, node2.ServiceAddress, "127.0.0.1") require.Equal(t, node2.ServicePort, 8080) } func TestCatalogServiceNodes_DistanceSort(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register nodes. args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "bar", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", Tags: []string{"a"}, }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } req, _ := http.NewRequest("GET", "/v1/catalog/service/api?tag=a", nil) args = &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.2", Service: &structs.NodeService{ Service: "api", Tags: []string{"a"}, }, } if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } // Nobody has coordinates set so this will still return them in the // order they are indexed. req, _ = http.NewRequest("GET", "/v1/catalog/service/api?tag=a&near=foo", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) if len(nodes) != 2 { t.Fatalf("bad: %v", obj) } if nodes[0].Node != "bar" { t.Fatalf("bad: %v", nodes) } if nodes[1].Node != "foo" { t.Fatalf("bad: %v", nodes) } // Send an update for the node and wait for it to get applied. arg := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "foo", Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), } if err := a.RPC(context.Background(), "Coordinate.Update", &arg, &out); err != nil { t.Fatalf("err: %v", err) } // Eventually foo should move to the front of the line. retry.Run(t, func(r *retry.R) { req, _ = http.NewRequest("GET", "/v1/catalog/service/api?tag=a&near=foo", nil) resp = httptest.NewRecorder() obj, err = a.srv.CatalogServiceNodes(resp, req) if err != nil { r.Fatalf("err: %v", err) } assertIndex(r, resp) nodes = obj.(structs.ServiceNodes) if len(nodes) != 2 { r.Fatalf("bad: %v", obj) } if nodes[0].Node != "foo" { r.Fatalf("bad: %v", nodes) } if nodes[1].Node != "bar" { r.Fatalf("bad: %v", nodes) } }) } // Test that connect proxies can be queried via /v1/catalog/service/:service // directly and that their results contain the proxy fields. func TestCatalogServiceNodes_ConnectProxy(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register args := structs.TestRegisterRequestProxy(t) var out struct{} assert.Nil(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", fmt.Sprintf( "/v1/catalog/service/%s", args.Service.Service), nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) assert.Nil(t, err) assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) assert.Len(t, nodes, 1) assert.Equal(t, structs.ServiceKindConnectProxy, nodes[0].ServiceKind) assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy) } func registerService(t *testing.T, a *TestAgent) (registerServiceReq *structs.RegisterRequest) { t.Helper() entMeta := acl.DefaultEnterpriseMeta() registerServiceReq = structs.TestRegisterRequestProxy(t) registerServiceReq.EnterpriseMeta = *entMeta registerServiceReq.Service.EnterpriseMeta = *entMeta registerServiceReq.Service.Proxy.Upstreams = structs.TestAddDefaultsToUpstreams(t, registerServiceReq.Service.Proxy.Upstreams, *entMeta) registerServiceReq.Check = &structs.HealthCheck{ Node: registerServiceReq.Node, Name: "check1", } var out struct{} require.NoError(t, a.RPC(context.Background(), "Catalog.Register", registerServiceReq, &out)) return } func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs.ProxyConfigEntry) { t.Helper() // Register proxy-defaults proxyGlobalEntry = structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: structs.ProxyConfigGlobal, Mode: structs.ProxyModeDirect, Config: map[string]interface{}{ "local_connect_timeout_ms": uint64(1000), "handshake_timeout_ms": uint64(1000), }, EnterpriseMeta: *acl.DefaultEnterpriseMeta(), } proxyDefaultsConfigEntryReq := &structs.ConfigEntryRequest{ Op: structs.ConfigEntryUpsert, Datacenter: "dc1", Entry: &proxyGlobalEntry, } var proxyDefaultsConfigEntryResp bool require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp)) return } func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (serviceDefaultsConfigEntry structs.ServiceConfigEntry) { t.Helper() limits := 512 serviceDefaultsConfigEntry = structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: serviceName, Mode: structs.ProxyModeTransparent, UpstreamConfig: &structs.UpstreamConfiguration{ Defaults: &structs.UpstreamConfig{ MeshGateway: structs.MeshGatewayConfig{ Mode: structs.MeshGatewayModeLocal, }, Limits: &structs.UpstreamLimits{ MaxConnections: &limits, }, }, }, EnterpriseMeta: *acl.DefaultEnterpriseMeta(), } serviceDefaultsConfigEntryReq := &structs.ConfigEntryRequest{ Op: structs.ConfigEntryUpsert, Datacenter: "dc1", Entry: &serviceDefaultsConfigEntry, } var serviceDefaultsConfigEntryResp bool require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp)) return } func validateMergeCentralConfigResponse(t *testing.T, v *structs.ServiceNode, registerServiceReq *structs.RegisterRequest, proxyGlobalEntry structs.ProxyConfigEntry, serviceDefaultsConfigEntry structs.ServiceConfigEntry) { t.Helper() require.Equal(t, registerServiceReq.Service.Service, v.ServiceName) // validate proxy global defaults are resolved in the merged service config require.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config) // validate service defaults override proxy-defaults/global require.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode) require.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode) // validate service defaults are resolved in the merged service config // expected number of upstreams = (number of upstreams defined in the register request proxy config + // 1 centrally configured default from service defaults) require.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams)) for _, up := range v.ServiceProxy.Upstreams { if up.DestinationType != "" && up.DestinationType != structs.UpstreamDestTypeService { continue } require.Contains(t, up.Config, "limits") upstreamLimits := up.Config["limits"].(*structs.UpstreamLimits) require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections) require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode) } } func TestListServiceNodes_MergeCentralConfig(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service registerServiceReq := registerService(t, a) // Register proxy-defaults proxyGlobalEntry := registerProxyDefaults(t, a) // Register service-defaults serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) type testCase struct { testCaseName string serviceName string connect bool } run := func(t *testing.T, tc testCase) { url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config", tc.serviceName) if tc.connect { url = fmt.Sprintf("/v1/catalog/connect/%s?merge-central-config", tc.serviceName) } req, _ := http.NewRequest("GET", url, nil) resp := httptest.NewRecorder() var obj interface{} var err error if tc.connect { obj, err = a.srv.CatalogConnectServiceNodes(resp, req) } else { obj, err = a.srv.CatalogServiceNodes(resp, req) } require.NoError(t, err) assertIndex(t, resp) serviceNodes := obj.(structs.ServiceNodes) // validate response require.Len(t, serviceNodes, 1) v := serviceNodes[0] validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) } testCases := []testCase{ { testCaseName: "List service instances with merge-central-config", serviceName: registerServiceReq.Service.Service, }, { testCaseName: "List connect capable service instances with merge-central-config", serviceName: registerServiceReq.Service.Proxy.DestinationServiceName, connect: true, }, } for _, tc := range testCases { t.Run(tc.testCaseName, func(t *testing.T) { run(t, tc) }) } } func TestCatalogServiceNodes_MergeCentralConfigBlocking(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service registerServiceReq := registerService(t, a) // Register proxy-defaults proxyGlobalEntry := registerProxyDefaults(t, a) // Run the query rpcReq := structs.ServiceSpecificRequest{ Datacenter: "dc1", ServiceName: registerServiceReq.Service.Service, MergeCentralConfig: true, } var rpcResp structs.IndexedServiceNodes require.NoError(t, a.RPC(context.Background(), "Catalog.ServiceNodes", &rpcReq, &rpcResp)) require.Len(t, rpcResp.ServiceNodes, 1) serviceNode := rpcResp.ServiceNodes[0] require.Equal(t, registerServiceReq.Service.Service, serviceNode.ServiceName) // validate proxy global defaults are resolved in the merged service config require.Equal(t, proxyGlobalEntry.Config, serviceNode.ServiceProxy.Config) require.Equal(t, proxyGlobalEntry.Mode, serviceNode.ServiceProxy.Mode) // Async cause a change - register service defaults waitIndex := rpcResp.Index start := time.Now() var serviceDefaultsConfigEntry structs.ServiceConfigEntry go func() { time.Sleep(100 * time.Millisecond) // Register service-defaults serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) }() const waitDuration = 3 * time.Second RUN_BLOCKING_QUERY: url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config&wait=%s&index=%d", registerServiceReq.Service.Service, waitDuration.String(), waitIndex) req, _ := http.NewRequest("GET", url, nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogServiceNodes(resp, req) require.NoError(t, err) assertIndex(t, resp) elapsed := time.Since(start) idx := getIndex(t, resp) if idx < waitIndex { t.Fatalf("bad index returned: %v", idx) } else if idx == waitIndex { if elapsed > waitDuration { // This should prevent the loop from running longer than the waitDuration t.Fatalf("too slow: %v", elapsed) } goto RUN_BLOCKING_QUERY } // Should block at least 100ms before getting the changed results if elapsed < 100*time.Millisecond { t.Fatalf("too fast: %v", elapsed) } serviceNodes := obj.(structs.ServiceNodes) // validate response require.Len(t, serviceNodes, 1) v := serviceNodes[0] validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) } // Test that the Connect-compatible endpoints can be queried for a // service via /v1/catalog/connect/:service. func TestCatalogConnectServiceNodes_good(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register args := structs.TestRegisterRequestProxy(t) args.Service.Address = "127.0.0.55" var out struct{} assert.Nil(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", fmt.Sprintf( "/v1/catalog/connect/%s", args.Service.Proxy.DestinationServiceName), nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogConnectServiceNodes(resp, req) assert.Nil(t, err) assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) assert.Len(t, nodes, 1) assert.Equal(t, structs.ServiceKindConnectProxy, nodes[0].ServiceKind) assert.Equal(t, args.Service.Address, nodes[0].ServiceAddress) assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy) } func TestCatalogConnectServiceNodes_Filter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register args := structs.TestRegisterRequestProxy(t) args.Service.Address = "127.0.0.55" var out struct{} require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) args = structs.TestRegisterRequestProxy(t) args.Service.Address = "127.0.0.55" args.Service.Meta = map[string]string{ "version": "2", } args.Service.ID = "web-proxy2" args.SkipNodeUpdate = true require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", fmt.Sprintf( "/v1/catalog/connect/%s?filter=%s", args.Service.Proxy.DestinationServiceName, url.QueryEscape("ServiceMeta.version == 2")), nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogConnectServiceNodes(resp, req) require.NoError(t, err) assertIndex(t, resp) nodes := obj.(structs.ServiceNodes) require.Len(t, nodes, 1) require.Equal(t, structs.ServiceKindConnectProxy, nodes[0].ServiceKind) require.Equal(t, args.Service.Address, nodes[0].ServiceAddress) require.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy) } func TestCatalogNodeServices(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register node with a regular service and connect proxy args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", Tags: []string{"a"}, }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } // Register a connect proxy args.Service = structs.TestNodeServiceProxy(t) require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc1", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodeServices(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) services := obj.(*structs.NodeServices) if len(services.Services) != 2 { t.Fatalf("bad: %v", obj) } // Proxy service should have it's config intact require.Equal(t, args.Service.Proxy, services.Services["web-proxy"].Proxy) } func TestCatalogNodeServiceList(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register node with a regular service and connect proxy args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", Tags: []string{"a"}, }, } var out struct{} if err := a.RPC(context.Background(), "Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) } // Register a connect proxy args.Service = structs.TestNodeServiceProxy(t) require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", "/v1/catalog/node-services/foo?dc=dc1", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodeServiceList(resp, req) if err != nil { t.Fatalf("err: %v", err) } assertIndex(t, resp) services := obj.(*structs.NodeServiceList) if len(services.Services) != 2 { t.Fatalf("bad: %v", obj) } var proxySvc *structs.NodeService for _, svc := range services.Services { if svc.ID == "web-proxy" { proxySvc = svc } } require.NotNil(t, proxySvc, "Missing proxy service registration") // Proxy service should have it's config intact require.Equal(t, args.Service.Proxy, proxySvc.Proxy) } func TestCatalogNodeServiceList_MergeCentralConfig(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service registerServiceReq := registerService(t, a) // Register proxy-defaults proxyGlobalEntry := registerProxyDefaults(t, a) // Register service-defaults serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config", registerServiceReq.Node) req, _ := http.NewRequest("GET", url, nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodeServiceList(resp, req) require.NoError(t, err) assertIndex(t, resp) nodeServices := obj.(*structs.NodeServiceList) // validate response require.Len(t, nodeServices.Services, 1) validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) } func TestCatalogNodeServiceList_MergeCentralConfigBlocking(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service registerServiceReq := registerService(t, a) // Register proxy-defaults proxyGlobalEntry := registerProxyDefaults(t, a) // Run the query rpcReq := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: registerServiceReq.Node, MergeCentralConfig: true, } var rpcResp structs.IndexedNodeServiceList require.NoError(t, a.RPC(context.Background(), "Catalog.NodeServiceList", &rpcReq, &rpcResp)) require.Len(t, rpcResp.NodeServices.Services, 1) nodeService := rpcResp.NodeServices.Services[0] require.Equal(t, registerServiceReq.Service.Service, nodeService.Service) // validate proxy global defaults are resolved in the merged service config require.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config) require.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode) // Async cause a change - register service defaults waitIndex := rpcResp.Index start := time.Now() var serviceDefaultsConfigEntry structs.ServiceConfigEntry go func() { time.Sleep(100 * time.Millisecond) // Register service-defaults serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) }() const waitDuration = 3 * time.Second RUN_BLOCKING_QUERY: url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config&wait=%s&index=%d", registerServiceReq.Node, waitDuration.String(), waitIndex) req, _ := http.NewRequest("GET", url, nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodeServiceList(resp, req) require.NoError(t, err) assertIndex(t, resp) elapsed := time.Since(start) idx := getIndex(t, resp) if idx < waitIndex { t.Fatalf("bad index returned: %v", idx) } else if idx == waitIndex { if elapsed > waitDuration { // This should prevent the loop from running longer than the waitDuration t.Fatalf("too slow: %v", elapsed) } goto RUN_BLOCKING_QUERY } // Should block at least 100ms before getting the changed results if elapsed < 100*time.Millisecond { t.Fatalf("too fast: %v", elapsed) } nodeServices := obj.(*structs.NodeServiceList) // validate response require.Len(t, nodeServices.Services, 1) validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) } func TestCatalogNodeServices_Filter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register node with a regular service and connect proxy args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", Service: &structs.NodeService{ Service: "api", Tags: []string{"a"}, }, } var out struct{} require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) // Register a connect proxy args.Service = structs.TestNodeServiceProxy(t) require.NoError(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc1&filter="+url.QueryEscape("Kind == `connect-proxy`"), nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodeServices(resp, req) require.NoError(t, err) assertIndex(t, resp) services := obj.(*structs.NodeServices) require.Len(t, services.Services, 1) // Proxy service should have it's config intact require.Equal(t, args.Service.Proxy, services.Services["web-proxy"].Proxy) } // Test that the services on a node contain all the Connect proxies on // the node as well with their fields properly populated. func TestCatalogNodeServices_ConnectProxy(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register args := structs.TestRegisterRequestProxy(t) var out struct{} assert.Nil(t, a.RPC(context.Background(), "Catalog.Register", args, &out)) req, _ := http.NewRequest("GET", fmt.Sprintf( "/v1/catalog/node/%s", args.Node), nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogNodeServices(resp, req) assert.Nil(t, err) assertIndex(t, resp) ns := obj.(*structs.NodeServices) assert.Len(t, ns.Services, 1) v := ns.Services[args.Service.Service] assert.Equal(t, structs.ServiceKindConnectProxy, v.Kind) } func TestCatalogNodeServices_WanTranslation(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a1 := NewTestAgent(t, ` datacenter = "dc1" translate_wan_addrs = true acl_datacenter = "" `) defer a1.Shutdown() testrpc.WaitForTestAgent(t, a1.RPC, "dc1") a2 := NewTestAgent(t, ` datacenter = "dc2" translate_wan_addrs = true acl_datacenter = "" `) defer a2.Shutdown() testrpc.WaitForTestAgent(t, a2.RPC, "dc2") // Wait for the WAN join. addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN) _, err := a2.srv.agent.JoinWAN([]string{addr}) require.NoError(t, err) retry.Run(t, func(r *retry.R) { require.Len(r, a1.WANMembers(), 2) }) // Register a node with DC2. { args := &structs.RegisterRequest{ Datacenter: "dc2", Node: "foo", Address: "127.0.0.1", TaggedAddresses: map[string]string{ "wan": "127.0.0.2", }, Service: &structs.NodeService{ Service: "http_wan_translation_test", Address: "127.0.0.1", Port: 8080, TaggedAddresses: map[string]structs.ServiceAddress{ "wan": { Address: "1.2.3.4", Port: 80, }, }, }, } var out struct{} require.NoError(t, a2.RPC(context.Background(), "Catalog.Register", args, &out)) } // Query for the node in DC2 from DC1. req, _ := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc2", nil) resp1 := httptest.NewRecorder() obj1, err1 := a1.srv.CatalogNodeServices(resp1, req) require.NoError(t, err1) require.NoError(t, checkIndex(resp1)) // Expect that DC1 gives us a WAN address (since the node is in DC2). service1, ok := obj1.(*structs.NodeServices) require.True(t, ok, "obj1 is not a *structs.NodeServices") require.NotNil(t, service1.Node) require.Equal(t, service1.Node.Address, "127.0.0.2") require.Len(t, service1.Services, 1) ns1, ok := service1.Services["http_wan_translation_test"] require.True(t, ok, "Missing service http_wan_translation_test") require.Equal(t, "1.2.3.4", ns1.Address) require.Equal(t, 80, ns1.Port) // Query DC2 from DC2. resp2 := httptest.NewRecorder() obj2, err2 := a2.srv.CatalogNodeServices(resp2, req) require.NoError(t, err2) require.NoError(t, checkIndex(resp2)) // Expect that DC2 gives us a private address (since the node is in DC2). service2 := obj2.(*structs.NodeServices) require.True(t, ok, "obj2 is not a *structs.NodeServices") require.NotNil(t, service2.Node) require.Equal(t, service2.Node.Address, "127.0.0.1") require.Len(t, service2.Services, 1) ns2, ok := service2.Services["http_wan_translation_test"] require.True(t, ok, "Missing service http_wan_translation_test") require.Equal(t, ns2.Address, "127.0.0.1") require.Equal(t, ns2.Port, 8080) } func TestCatalog_GatewayServices_Terminating(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register a service to be covered by the wildcard in the config entry args := structs.TestRegisterRequest(t) args.Service.Service = "redis" args.Check = &structs.HealthCheck{ Name: "redis", Status: api.HealthPassing, ServiceID: args.Service.Service, } var out struct{} assert.NoError(t, a.RPC(context.Background(), "Catalog.Register", &args, &out)) // Associate the gateway and api/redis services entryArgs := &structs.ConfigEntryRequest{ Op: structs.ConfigEntryUpsert, Datacenter: "dc1", Entry: &structs.TerminatingGatewayConfigEntry{ Kind: "terminating-gateway", Name: "terminating", Services: []structs.LinkedService{ { Name: "api", CAFile: "api/ca.crt", CertFile: "api/client.crt", KeyFile: "api/client.key", SNI: "my-domain", }, { Name: "*", CAFile: "ca.crt", CertFile: "client.crt", KeyFile: "client.key", SNI: "my-alt-domain", DisableAutoHostRewrite: true, }, }, }, } var entryResp bool assert.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &entryArgs, &entryResp)) retry.Run(t, func(r *retry.R) { req, _ := http.NewRequest("GET", "/v1/catalog/gateway-services/terminating", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogGatewayServices(resp, req) assert.NoError(r, err) header := resp.Header().Get("X-Consul-Index") if header == "" || header == "0" { r.Fatalf("Bad: %v", header) } gatewayServices := obj.(structs.GatewayServices) expect := structs.GatewayServices{ { Service: structs.NewServiceName("api", nil), Gateway: structs.NewServiceName("terminating", nil), GatewayKind: structs.ServiceKindTerminatingGateway, CAFile: "api/ca.crt", CertFile: "api/client.crt", KeyFile: "api/client.key", SNI: "my-domain", AutoHostRewrite: true, }, { Service: structs.NewServiceName("redis", nil), Gateway: structs.NewServiceName("terminating", nil), GatewayKind: structs.ServiceKindTerminatingGateway, CAFile: "ca.crt", CertFile: "client.crt", KeyFile: "client.key", SNI: "my-alt-domain", FromWildcard: true, AutoHostRewrite: false, }, } // Ignore raft index for equality for _, s := range gatewayServices { s.RaftIndex = structs.RaftIndex{} } assert.Equal(r, expect, gatewayServices) }) } func TestCatalog_GatewayServices_Ingress(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Associate an ingress gateway with api/redis entryArgs := &structs.ConfigEntryRequest{ Op: structs.ConfigEntryUpsert, Datacenter: "dc1", Entry: &structs.IngressGatewayConfigEntry{ Kind: "ingress-gateway", Name: "ingress", Listeners: []structs.IngressListener{ { Port: 8888, Services: []structs.IngressService{ { Name: "api", }, }, }, { Port: 9999, Services: []structs.IngressService{ { Name: "redis", }, }, }, }, }, } var entryResp bool require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &entryArgs, &entryResp)) retry.Run(t, func(r *retry.R) { req, _ := http.NewRequest("GET", "/v1/catalog/gateway-services/ingress", nil) resp := httptest.NewRecorder() obj, err := a.srv.CatalogGatewayServices(resp, req) require.NoError(r, err) header := resp.Header().Get("X-Consul-Index") if header == "" || header == "0" { r.Fatalf("Bad: %v", header) } gatewayServices := obj.(structs.GatewayServices) expect := structs.GatewayServices{ { Service: structs.NewServiceName("api", nil), Gateway: structs.NewServiceName("ingress", nil), GatewayKind: structs.ServiceKindIngressGateway, Protocol: "tcp", Port: 8888, }, { Service: structs.NewServiceName("redis", nil), Gateway: structs.NewServiceName("ingress", nil), GatewayKind: structs.ServiceKindIngressGateway, Protocol: "tcp", Port: 9999, }, } // Ignore raft index for equality for _, s := range gatewayServices { s.RaftIndex = structs.RaftIndex{} } require.Equal(r, expect, gatewayServices) }) } func TestCatalogRegister_AssignManualServiceVIPs(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() a := NewTestAgent(t, "") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") for _, service := range []string{"api", "web"} { req := structs.ConfigEntryRequest{ Datacenter: "dc1", Entry: &structs.ServiceResolverConfigEntry{ Kind: structs.ServiceResolver, Name: service, }, } var out bool require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &req, &out)) } assignVIPs := func(req structs.AssignServiceManualVIPsRequest, expect structs.AssignServiceManualVIPsResponse) { httpReq, _ := http.NewRequest("PUT", "/v1/internal/service-virtual-ip", jsonReader(req)) resp := httptest.NewRecorder() obj, err := a.srv.AssignManualServiceVIPs(resp, httpReq) require.NoError(t, err) result, ok := obj.(structs.AssignServiceManualVIPsResponse) require.True(t, ok) require.Equal(t, expect, result) } // Assign some manual IPs to the service assignVIPs(structs.AssignServiceManualVIPsRequest{ Service: "api", ManualVIPs: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, }, structs.AssignServiceManualVIPsResponse{ Found: true, }) // Assign some manual IPs to the new service, reassigning one from the existing service. assignVIPs(structs.AssignServiceManualVIPsRequest{ Service: "web", ManualVIPs: []string{"2.2.2.2", "4.4.4.4"}, }, structs.AssignServiceManualVIPsResponse{ Found: true, UnassignedFrom: []structs.PeeredServiceName{ { ServiceName: structs.ServiceName{Name: "api", EnterpriseMeta: *acl.DefaultEnterpriseMeta()}, }, }, }) // Assign some manual IPs a non-existent service, should be a no-op. assignVIPs(structs.AssignServiceManualVIPsRequest{ Service: "nope", ManualVIPs: []string{"1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4"}, }, structs.AssignServiceManualVIPsResponse{ Found: false, }) }