From cdeff022ddbfbac4ece605dcf14d7ab834f2b78f Mon Sep 17 00:00:00 2001 From: David van Geest Date: Wed, 15 Jun 2016 14:02:51 -0400 Subject: [PATCH 1/5] Translate Address to tagged WAN address in HTTP API when appropriate. --- command/agent/agent.go | 12 + command/agent/catalog_endpoint.go | 19 ++ command/agent/catalog_endpoint_test.go | 237 ++++++++++++++++++ command/agent/dns.go | 20 +- command/agent/health_endpoint.go | 7 + command/agent/health_endpoint_test.go | 77 ++++++ command/agent/http_test.go | 44 ++++ command/agent/prepared_query_endpoint.go | 7 + command/agent/prepared_query_endpoint_test.go | 46 ++++ consul/state/state_store.go | 4 +- consul/structs/structs.go | 1 + 11 files changed, 457 insertions(+), 17 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 79b94d55db..fcd647bd18 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1069,6 +1069,18 @@ func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error return nil } +// TranslateAddr is used to provide the final, translated address for a node, +// depending on how this agent and the other node are configured. +func (a *Agent) TranslateAddr(dc string, addr string, taggedAddr map[string]string) string { + if a.config.TranslateWanAddrs && (a.config.Datacenter != dc) { + wanAddr := taggedAddr["wan"] + if wanAddr != "" { + addr = wanAddr + } + } + return addr +} + // persistCheckState is used to record the check status into the data dir. // This allows the state to be restored on a later agent start. Currently // only useful for TTL based checks. diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index e8df2ee17b..1aa05447c6 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -77,6 +77,12 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( if out.Nodes == nil { out.Nodes = make(structs.Nodes, 0) } + + for _, node := range out.Nodes { + addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) + node.Address = addr + } + return out.Nodes, nil } @@ -129,6 +135,12 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req if out.ServiceNodes == nil { out.ServiceNodes = make(structs.ServiceNodes, 0) } + + for _, serviceNode := range out.ServiceNodes { + addr := s.agent.TranslateAddr(args.Datacenter, serviceNode.Address, serviceNode.TaggedAddresses) + serviceNode.Address = addr + } + return out.ServiceNodes, nil } @@ -153,5 +165,12 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { return nil, err } + + if out.NodeServices != nil { + node := out.NodeServices.Node + addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) + node.Address = addr + } + return out.NodeServices, nil } diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index f4ea16d45f..ab78306b2d 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -145,6 +145,95 @@ func TestCatalogNodes(t *testing.T) { } } +func TestCatalogNodes_WanTranslation(t *testing.T) { + httpCtx1, httpCtx2 := setupWanHTTPServers(t) + defer shutdownHTTPServer(httpCtx1) + defer shutdownHTTPServer(httpCtx2) + srv1 := httpCtx1.srv + srv2 := httpCtx2.srv + + // Register a node with DC2 + { + args := &structs.RegisterRequest{ + Datacenter: "dc2", + Node: "wan_translation_test", + Address: "127.0.0.1", + TaggedAddresses: map[string]string{ + "wan": "127.0.0.2", + }, + Service: &structs.NodeService{ + Service: "http_wan_translation_test", + }, + } + + var out struct{} + if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + req, err := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc2", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // get nodes for DC2 from DC1 + resp1 := httptest.NewRecorder() + obj1, err1 := srv1.CatalogNodes(resp1, req) + if err1 != nil { + t.Fatalf("err: %v", err1) + } + + // Verify an index is set + assertIndex(t, resp1) + + nodes1 := obj1.(structs.Nodes) + if len(nodes1) != 2 { + t.Fatalf("bad: %v", obj1) + } + + var node1 *structs.Node + + for _, node := range nodes1 { + if node.Node == "wan_translation_test" { + node1 = node + } + } + + // Expect that DC1 gives us a public address (since the node is in DC2) + if node1.Address != "127.0.0.2" { + t.Fatalf("bad: %v", node1) + } + + // get nodes for DC2 from DC2 + resp2 := httptest.NewRecorder() + obj2, err2 := srv2.CatalogNodes(resp2, req) + if err2 != nil { + t.Fatalf("err: %v", err2) + } + + // Verify an index is set + assertIndex(t, resp2) + + nodes2 := obj2.(structs.Nodes) + if len(nodes2) != 2 { + t.Fatalf("bad: %v", obj2) + } + + var node2 *structs.Node + + for _, node := range nodes2 { + if node.Node == "wan_translation_test" { + node2 = node + } + } + + // Expect that DC2 gives us a private address (since the node is in DC2) + if node2.Address != "127.0.0.1" { + t.Fatalf("bad: %v", node2) + } +} + func TestCatalogNodes_Blocking(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) @@ -407,6 +496,81 @@ func TestCatalogServiceNodes(t *testing.T) { } } +func TestCatalogServiceNodes_WanTranslation(t *testing.T) { + httpCtx1, httpCtx2 := setupWanHTTPServers(t) + defer shutdownHTTPServer(httpCtx1) + defer shutdownHTTPServer(httpCtx2) + srv1 := httpCtx1.srv + srv2 := httpCtx2.srv + + // Register a node with DC2 + { + args := &structs.RegisterRequest{ + Datacenter: "dc2", + Node: "foo", + Address: "127.0.0.1", + TaggedAddresses: map[string]string{ + "wan": "127.0.0.2", + }, + Service: &structs.NodeService{ + Service: "http_wan_translation_test", + }, + } + + var out struct{} + if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + req, err := http.NewRequest("GET", "/v1/catalog/service/http_wan_translation_test?dc=dc2", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ask HTTP server on DC1 for the node + resp1 := httptest.NewRecorder() + obj1, err1 := srv1.CatalogServiceNodes(resp1, req) + if err1 != nil { + t.Fatalf("err: %v", err1) + } + + assertIndex(t, resp1) + + nodes1 := obj1.(structs.ServiceNodes) + if len(nodes1) != 1 { + t.Fatalf("bad: %v", obj1) + } + + node1 := nodes1[0] + + // Expect that DC1 gives us a public address (since the node is in DC2) + if node1.Address != "127.0.0.2" { + t.Fatalf("bad: %v", node1) + } + + // Ask HTTP server on DC2 for the node + resp2 := httptest.NewRecorder() + obj2, err2 := srv2.CatalogServiceNodes(resp2, req) + if err2 != nil { + t.Fatalf("err: %v", err2) + } + + assertIndex(t, resp2) + + nodes2 := obj2.(structs.ServiceNodes) + if len(nodes2) != 1 { + t.Fatalf("bad: %v", obj2) + } + + node2 := nodes2[0] + + // Expect that DC2 gives us a local address (since the node is in DC2) + if node2.Address != "127.0.0.1" { + t.Fatalf("bad: %v", node2) + } +} + func TestCatalogServiceNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) @@ -550,3 +714,76 @@ func TestCatalogNodeServices(t *testing.T) { t.Fatalf("bad: %v", obj) } } + +func TestCatalogNodeServices_WanTranslation(t *testing.T) { + httpCtx1, httpCtx2 := setupWanHTTPServers(t) + defer shutdownHTTPServer(httpCtx1) + defer shutdownHTTPServer(httpCtx2) + srv1 := httpCtx1.srv + srv2 := httpCtx2.srv + + // Register a node with DC2 + { + args := &structs.RegisterRequest{ + Datacenter: "dc2", + Node: "foo", + Address: "127.0.0.1", + TaggedAddresses: map[string]string{ + "wan": "127.0.0.2", + }, + Service: &structs.NodeService{ + Service: "http_wan_translation_test", + }, + } + + var out struct{} + if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + req, err := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc2", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // ask DC1 for node in DC2 + resp1 := httptest.NewRecorder() + obj1, err1 := srv1.CatalogNodeServices(resp1, req) + if err1 != nil { + t.Fatalf("err: %v", err1) + } + assertIndex(t, resp1) + + services1 := obj1.(*structs.NodeServices) + if len(services1.Services) != 1 { + t.Fatalf("bad: %v", obj1) + } + + service1 := services1.Node + + // Expect that DC1 gives us a public address (since the node is in DC2) + if service1.Address != "127.0.0.2" { + t.Fatalf("bad: %v", service1) + } + + // ask DC2 for node in DC2 + resp2 := httptest.NewRecorder() + obj2, err2 := srv2.CatalogNodeServices(resp2, req) + if err2 != nil { + t.Fatalf("err: %v", err2) + } + assertIndex(t, resp2) + + services2 := obj2.(*structs.NodeServices) + if len(services2.Services) != 1 { + t.Fatalf("bad: %v", obj2) + } + + service2 := services2.Node + + // Expect that DC2 gives us a private address (since the node is in DC2) + if service2.Address != "127.0.0.1" { + t.Fatalf("bad: %v", service2) + } +} diff --git a/command/agent/dns.go b/command/agent/dns.go index 3755bc0502..1a8eec1442 100644 --- a/command/agent/dns.go +++ b/command/agent/dns.go @@ -370,19 +370,6 @@ INVALID: resp.SetRcode(req, dns.RcodeNameError) } -// translateAddr is used to provide the final, translated address for a node, -// depending on how this agent and the other node are configured. -func (d *DNSServer) translateAddr(dc string, node *structs.Node) string { - addr := node.Address - if d.agent.config.TranslateWanAddrs && (d.agent.config.Datacenter != dc) { - wanAddr := node.TaggedAddresses["wan"] - if wanAddr != "" { - addr = wanAddr - } - } - return addr -} - // nodeLookup is used to handle a node query func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns.Msg) { // Only handle ANY, A and AAAA type requests @@ -423,7 +410,8 @@ RPC: } // Add the node record - addr := d.translateAddr(datacenter, out.NodeServices.Node) + n := out.NodeServices.Node + addr := d.agent.TranslateAddr(datacenter, n.Address, n.TaggedAddresses) records := d.formatNodeRecord(out.NodeServices.Node, addr, req.Question[0].Name, qType, d.config.NodeTTL) if records != nil { @@ -776,7 +764,7 @@ func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNode for _, node := range nodes { // Start with the translated address but use the service address, // if specified. - addr := d.translateAddr(dc, node.Node) + addr := d.agent.TranslateAddr(dc, node.Node.Address, node.Node.TaggedAddresses) if node.Service.Address != "" { addr = node.Service.Address } @@ -825,7 +813,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes // Start with the translated address but use the service address, // if specified. - addr := d.translateAddr(dc, node.Node) + addr := d.agent.TranslateAddr(dc, node.Node.Address, node.Node.TaggedAddresses) if node.Service.Address != "" { addr = node.Service.Address } diff --git a/command/agent/health_endpoint.go b/command/agent/health_endpoint.go index b252a4126a..f2ddbafb9c 100644 --- a/command/agent/health_endpoint.go +++ b/command/agent/health_endpoint.go @@ -143,6 +143,13 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ if out.Nodes == nil { out.Nodes = make(structs.CheckServiceNodes, 0) } + + for _, checkServiceNode := range out.Nodes { + node := checkServiceNode.Node + addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) + node.Address = addr + } + return out.Nodes, nil } diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index 7bcbc91169..584a0ae2e6 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -554,6 +554,83 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) { } } +func TestHealthServiceNodes_WanTranslation(t *testing.T) { + httpCtx1, httpCtx2 := setupWanHTTPServers(t) + defer shutdownHTTPServer(httpCtx1) + defer shutdownHTTPServer(httpCtx2) + srv1 := httpCtx1.srv + srv2 := httpCtx2.srv + + // Register a node with DC2 + { + args := &structs.RegisterRequest{ + Datacenter: "dc2", + Node: "foo", + Address: "127.0.0.1", + TaggedAddresses: map[string]string{ + "wan": "127.0.0.2", + }, + Service: &structs.NodeService{ + Service: "http_wan_translation_test", + }, + } + + var out struct{} + if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + req, err := http.NewRequest("GET", "/v1/health/service/http_wan_translation_test?dc=dc2", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // ask DC1 for node in DC2 + resp1 := httptest.NewRecorder() + obj1, err1 := srv1.HealthServiceNodes(resp1, req) + if err1 != nil { + t.Fatalf("err: %v", err1) + } + + assertIndex(t, resp1) + + // Should be 1 health check for consul + nodes1 := obj1.(structs.CheckServiceNodes) + if len(nodes1) != 1 { + t.Fatalf("bad: %v", obj1) + } + + node1 := nodes1[0].Node + + // Expect that DC1 gives us a public address (since the node is in DC2) + if node1.Address != "127.0.0.2" { + t.Fatalf("bad: %v", node1) + } + + // ask DC2 for node in DC2 + resp2 := httptest.NewRecorder() + obj2, err2 := srv2.HealthServiceNodes(resp2, req) + if err2 != nil { + t.Fatalf("err: %v", err2) + } + + assertIndex(t, resp2) + + // Should be 1 health check for consul + nodes2 := obj2.(structs.CheckServiceNodes) + if len(nodes2) != 1 { + t.Fatalf("bad: %v", obj2) + } + + node2 := nodes2[0].Node + + // Expect that DC2 gives us a private address (since the node is in DC2) + if node2.Address != "127.0.0.1" { + t.Fatalf("bad: %v", node2) + } +} + func TestFilterNonPassing(t *testing.T) { nodes := structs.CheckServiceNodes{ structs.CheckServiceNode{ diff --git a/command/agent/http_test.go b/command/agent/http_test.go index b6618977f3..7352c6186c 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -23,6 +23,50 @@ import ( "github.com/hashicorp/go-cleanhttp" ) +type HTTPServerCtx struct { + dir string + srv *HTTPServer +} + +func setupWanHTTPServers(t *testing.T) (HTTPServerCtx, HTTPServerCtx) { + dir1, srv1 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc1" + c.TranslateWanAddrs = true + }) + + dir2, srv2 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc2" + c.TranslateWanAddrs = true + }) + + testutil.WaitForLeader(t, srv1.agent.RPC, "dc1") + testutil.WaitForLeader(t, srv2.agent.RPC, "dc2") + + // Join WAN cluster + addr := fmt.Sprintf("127.0.0.1:%d", + srv1.agent.config.Ports.SerfWan) + if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult( + func() (bool, error) { + return len(srv1.agent.WANMembers()) > 1, nil + }, + func(err error) { + t.Fatalf("Failed waiting for WAN join: %v", err) + }) + return HTTPServerCtx{dir1, srv1}, HTTPServerCtx{dir2, srv2} +} + +func shutdownHTTPServer(httpCtx HTTPServerCtx) { + os.RemoveAll(httpCtx.dir) + httpCtx.srv.Shutdown() + httpCtx.srv.agent.Shutdown() +} + func makeHTTPServer(t *testing.T) (string, *HTTPServer) { return makeHTTPServerWithConfig(t, nil) } diff --git a/command/agent/prepared_query_endpoint.go b/command/agent/prepared_query_endpoint.go index 1a6ff6d72e..df6f51f4d4 100644 --- a/command/agent/prepared_query_endpoint.go +++ b/command/agent/prepared_query_endpoint.go @@ -126,6 +126,13 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r if reply.Nodes == nil { reply.Nodes = make(structs.CheckServiceNodes, 0) } + + for _, checkServiceNode := range reply.Nodes { + node := checkServiceNode.Node + addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) + node.Address = addr + } + return reply, nil } diff --git a/command/agent/prepared_query_endpoint_test.go b/command/agent/prepared_query_endpoint_test.go index ff757e0acf..ceac2f4408 100644 --- a/command/agent/prepared_query_endpoint_test.go +++ b/command/agent/prepared_query_endpoint_test.go @@ -359,6 +359,52 @@ func TestPreparedQuery_Execute(t *testing.T) { } }) + // testing WAN translation in the response + httpTestWithConfig(t, func(srv *HTTPServer) { + m := MockPreparedQuery{} + if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil { + t.Fatalf("err: %v", err) + } + + m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { + nodesResponse := make(structs.CheckServiceNodes, 1) + nodesResponse[0].Node = &structs.Node{Node: "foo", Address: "127.0.0.1", + TaggedAddresses: map[string]string{"wan": "127.0.0.2"}} + reply.Nodes = nodesResponse + return nil + } + + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/query/my-id/execute?dc=dc2", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.PreparedQuerySpecific(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 200 { + t.Fatalf("bad code: %d", resp.Code) + } + r, ok := obj.(structs.PreparedQueryExecuteResponse) + if !ok { + t.Fatalf("unexpected: %T", obj) + } + if r.Nodes == nil || len(r.Nodes) != 1 { + t.Fatalf("bad: %v", r) + } + + node := r.Nodes[0] + if node.Node.Address != "127.0.0.2" { + t.Fatalf("bad: %v", node.Node) + } + }, func(c *Config) { + c.Datacenter = "dc1" + c.TranslateWanAddrs = true + }) + httpTest(t, func(srv *HTTPServer) { body := bytes.NewBuffer(nil) req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 7c48a1feee..0f1d95fb90 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -839,7 +839,9 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo if err != nil { return nil, fmt.Errorf("failed node lookup: %s", err) } - s.Address = n.(*structs.Node).Address + node := n.(*structs.Node) + s.Address = node.Address + s.TaggedAddresses = node.TaggedAddresses results = append(results, s) } return results, nil diff --git a/consul/structs/structs.go b/consul/structs/structs.go index c9ee486fc1..23c4c78eb4 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -264,6 +264,7 @@ type Services map[string][]string type ServiceNode struct { Node string Address string + TaggedAddresses map[string]string ServiceID string ServiceName string ServiceTags []string From f7eaa066167d0a29a013125bc504ef93d664e5bd Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 12 Aug 2016 16:28:56 -0700 Subject: [PATCH 2/5] Makes the filled-in parts of ServiceNode more explicit. --- consul/state/state_store.go | 19 +++++++++++++------ consul/structs/structs.go | 23 +++++++++++++++-------- consul/structs/structs_test.go | 28 +++++++++++++++++++++++----- 3 files changed, 51 insertions(+), 19 deletions(-) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 0f1d95fb90..7949d89f12 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -678,9 +678,11 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa return fmt.Errorf("failed service lookup: %s", err) } - // Create the service node entry and populate the indexes. We leave the - // address blank and fill that in on the way out during queries. - entry := svc.ToServiceNode(node, "") + // Create the service node entry and populate the indexes. Note that + // conversion doesn't populate any of the node-specific information + // (Address and TaggedAddresses). That's always populated when we read + // from the state store. + entry := svc.ToServiceNode(node) if existing != nil { entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex entry.ModifyIndex = idx @@ -830,18 +832,23 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo var results structs.ServiceNodes for _, sn := range services { // Note that we have to clone here because we don't want to - // modify the address field on the object in the database, + // modify the node-related fields on the object in the database, // which is what we are referencing. - s := sn.Clone() + s := sn.PartialClone() - // Fill in the address of the node. + // Grab the corresponding node record. n, err := tx.First("nodes", "id", sn.Node) if err != nil { return nil, fmt.Errorf("failed node lookup: %s", err) } + + // Populate the node-related fields. The tagged addresses may be + // used by agents to perform address translation if they are + // configured to do that. node := n.(*structs.Node) s.Address = node.Address s.TaggedAddresses = node.TaggedAddresses + results = append(results, s) } return results, nil diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 23c4c78eb4..f3f8ff9ea6 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -260,7 +260,11 @@ type Nodes []*Node // Maps service name to available tags type Services map[string][]string -// ServiceNode represents a node that is part of a service +// ServiceNode represents a node that is part of a service. Address and +// TaggedAddresses are node-related fields that are always empty in the state +// store and are filled in on the way out by parseServiceNodes(). This is also +// why PartialClone() skips them, because we know they are blank already so it +// would be a waste of time to copy them. type ServiceNode struct { Node string Address string @@ -275,14 +279,16 @@ type ServiceNode struct { RaftIndex } -// Clone returns a clone of the given service node. -func (s *ServiceNode) Clone() *ServiceNode { +// PartialClone() returns a clone of the given service node, minus the node- +// related fields that get filled in later, Address and TaggedAddresses. +func (s *ServiceNode) PartialClone() *ServiceNode { tags := make([]string, len(s.ServiceTags)) copy(tags, s.ServiceTags) return &ServiceNode{ - Node: s.Node, - Address: s.Address, + Node: s.Node, + // Skip Address, see above. + // Skip TaggedAddresses, see above. ServiceID: s.ServiceID, ServiceName: s.ServiceName, ServiceTags: tags, @@ -344,10 +350,11 @@ func (s *NodeService) IsSame(other *NodeService) bool { } // ToServiceNode converts the given node service to a service node. -func (s *NodeService) ToServiceNode(node, address string) *ServiceNode { +func (s *NodeService) ToServiceNode(node string) *ServiceNode { return &ServiceNode{ - Node: node, - Address: address, + Node: node, + // Skip Address, see ServiceNode definition. + // Skip TaggedAddresses, see ServiceNode definition. ServiceID: s.ID, ServiceName: s.Service, ServiceTags: s.Tags, diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index 8caa66ffe8..11c5b2aea3 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -108,8 +108,11 @@ func TestStructs_ACL_IsSame(t *testing.T) { // testServiceNode gives a fully filled out ServiceNode instance. func testServiceNode() *ServiceNode { return &ServiceNode{ - Node: "node1", - Address: "127.0.0.1", + Node: "node1", + Address: "127.0.0.1", + TaggedAddresses: map[string]string{ + "hello": "world", + }, ServiceID: "service1", ServiceName: "dogs", ServiceTags: []string{"prod", "v1"}, @@ -123,10 +126,20 @@ func testServiceNode() *ServiceNode { } } -func TestStructs_ServiceNode_Clone(t *testing.T) { +func TestStructs_ServiceNode_PartialClone(t *testing.T) { sn := testServiceNode() - clone := sn.Clone() + clone := sn.PartialClone() + + // Make sure the parts that weren't supposed to be cloned didn't get + // copied over, then zero-value them out so we can do a DeepEqual() on + // the rest of the contents. + if clone.Address != "" || len(clone.TaggedAddresses) != 0 { + t.Fatalf("bad: %v", clone) + } + + sn.Address = "" + sn.TaggedAddresses = nil if !reflect.DeepEqual(sn, clone) { t.Fatalf("bad: %v", clone) } @@ -140,7 +153,12 @@ func TestStructs_ServiceNode_Clone(t *testing.T) { func TestStructs_ServiceNode_Conversions(t *testing.T) { sn := testServiceNode() - sn2 := sn.ToNodeService().ToServiceNode("node1", "127.0.0.1") + sn2 := sn.ToNodeService().ToServiceNode("node1") + + // These two fields get lost in the conversion, so we have to zero-value + // them out before we do the compare. + sn.Address = "" + sn.TaggedAddresses = nil if !reflect.DeepEqual(sn, sn2) { t.Fatalf("bad: %v", sn2) } From c0ff41265038bdaf9c4c27ce87d4200bbb345993 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 15 Aug 2016 15:05:02 -0700 Subject: [PATCH 3/5] Factors translate out into a separate file and makes safe for in-memory RPCs. --- command/agent/agent.go | 12 ----- command/agent/catalog_endpoint.go | 24 +++------ command/agent/dns.go | 6 +-- command/agent/health_endpoint.go | 9 ++-- command/agent/prepared_query_endpoint.go | 13 +++-- command/agent/translate_addr.go | 67 ++++++++++++++++++++++++ 6 files changed, 85 insertions(+), 46 deletions(-) create mode 100644 command/agent/translate_addr.go diff --git a/command/agent/agent.go b/command/agent/agent.go index fcd647bd18..79b94d55db 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1069,18 +1069,6 @@ func (a *Agent) UpdateCheck(checkID types.CheckID, status, output string) error return nil } -// TranslateAddr is used to provide the final, translated address for a node, -// depending on how this agent and the other node are configured. -func (a *Agent) TranslateAddr(dc string, addr string, taggedAddr map[string]string) string { - if a.config.TranslateWanAddrs && (a.config.Datacenter != dc) { - wanAddr := taggedAddr["wan"] - if wanAddr != "" { - addr = wanAddr - } - } - return addr -} - // persistCheckState is used to record the check status into the data dir. // This allows the state to be restored on a later agent start. Currently // only useful for TTL based checks. diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 1aa05447c6..efccfba73c 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -2,9 +2,10 @@ package agent import ( "fmt" - "github.com/hashicorp/consul/consul/structs" "net/http" "strings" + + "github.com/hashicorp/consul/consul/structs" ) func (s *HTTPServer) CatalogRegister(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -72,17 +73,12 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil { return nil, err } + translateAddresses(s.agent.config, args.Datacenter, out.Nodes) // Use empty list instead of nil if out.Nodes == nil { out.Nodes = make(structs.Nodes, 0) } - - for _, node := range out.Nodes { - addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) - node.Address = addr - } - return out.Nodes, nil } @@ -130,17 +126,12 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { return nil, err } + translateAddresses(s.agent.config, args.Datacenter, out.ServiceNodes) // Use empty list instead of nil if out.ServiceNodes == nil { out.ServiceNodes = make(structs.ServiceNodes, 0) } - - for _, serviceNode := range out.ServiceNodes { - addr := s.agent.TranslateAddr(args.Datacenter, serviceNode.Address, serviceNode.TaggedAddresses) - serviceNode.Address = addr - } - return out.ServiceNodes, nil } @@ -165,11 +156,8 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { return nil, err } - - if out.NodeServices != nil { - node := out.NodeServices.Node - addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) - node.Address = addr + if out.NodeServices != nil && out.NodeServices.Node != nil { + translateAddresses(s.agent.config, args.Datacenter, out.NodeServices.Node) } return out.NodeServices, nil diff --git a/command/agent/dns.go b/command/agent/dns.go index 1a8eec1442..e75c5a7913 100644 --- a/command/agent/dns.go +++ b/command/agent/dns.go @@ -411,7 +411,7 @@ RPC: // Add the node record n := out.NodeServices.Node - addr := d.agent.TranslateAddr(datacenter, n.Address, n.TaggedAddresses) + addr := translateAddress(d.agent.config, datacenter, n.Address, n.TaggedAddresses) records := d.formatNodeRecord(out.NodeServices.Node, addr, req.Question[0].Name, qType, d.config.NodeTTL) if records != nil { @@ -764,7 +764,7 @@ func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNode for _, node := range nodes { // Start with the translated address but use the service address, // if specified. - addr := d.agent.TranslateAddr(dc, node.Node.Address, node.Node.TaggedAddresses) + addr := translateAddress(d.agent.config, dc, node.Node.Address, node.Node.TaggedAddresses) if node.Service.Address != "" { addr = node.Service.Address } @@ -813,7 +813,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes // Start with the translated address but use the service address, // if specified. - addr := d.agent.TranslateAddr(dc, node.Node.Address, node.Node.TaggedAddresses) + addr := translateAddress(d.agent.config, dc, node.Node.Address, node.Node.TaggedAddresses) if node.Service.Address != "" { addr = node.Service.Address } diff --git a/command/agent/health_endpoint.go b/command/agent/health_endpoint.go index f2ddbafb9c..eeb26edbe4 100644 --- a/command/agent/health_endpoint.go +++ b/command/agent/health_endpoint.go @@ -131,6 +131,9 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ out.Nodes = filterNonPassing(out.Nodes) } + // Translate addresses after filtering so we don't waste effort. + translateAddresses(s.agent.config, args.Datacenter, out.Nodes) + // Use empty list instead of nil for i, _ := range out.Nodes { // TODO (slackpad) It's lame that this isn't a slice of pointers @@ -144,12 +147,6 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ out.Nodes = make(structs.CheckServiceNodes, 0) } - for _, checkServiceNode := range out.Nodes { - node := checkServiceNode.Node - addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) - node.Address = addr - } - return out.Nodes, nil } diff --git a/command/agent/prepared_query_endpoint.go b/command/agent/prepared_query_endpoint.go index df6f51f4d4..f182607f1f 100644 --- a/command/agent/prepared_query_endpoint.go +++ b/command/agent/prepared_query_endpoint.go @@ -122,17 +122,16 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r return nil, err } + // Note that we translate using the DC that the results came from, since + // a query can fail over to a different DC than where the execute request + // was sent to. That's why we use the reply's DC and not the one from + // the args. + translateAddresses(s.agent.config, reply.Datacenter, reply.Nodes) + // Use empty list instead of nil. if reply.Nodes == nil { reply.Nodes = make(structs.CheckServiceNodes, 0) } - - for _, checkServiceNode := range reply.Nodes { - node := checkServiceNode.Node - addr := s.agent.TranslateAddr(args.Datacenter, node.Address, node.TaggedAddresses) - node.Address = addr - } - return reply, nil } diff --git a/command/agent/translate_addr.go b/command/agent/translate_addr.go new file mode 100644 index 0000000000..7ca65a4932 --- /dev/null +++ b/command/agent/translate_addr.go @@ -0,0 +1,67 @@ +package agent + +import ( + "fmt" + + "github.com/hashicorp/consul/consul/structs" +) + +// translateAddress is used to provide the final, translated address for a node, +// depending on how the agent and the other node are configured. The dc +// parameter is the dc the datacenter this node is from. +func translateAddress(config *Config, dc string, addr string, taggedAddresses map[string]string) string { + if config.TranslateWanAddrs && (config.Datacenter != dc) { + wanAddr := taggedAddresses["wan"] + if wanAddr != "" { + addr = wanAddr + } + } + return addr +} + +// translateAddresses translates addresses in the given structure into the +// final, translated address, depending on how the agent and the other node are +// configured. The dc parameter is the datacenter this structure is from. +func translateAddresses(config *Config, dc string, subj interface{}) { + // CAUTION - SUBTLE! An agent running on a server can, in some cases, + // return pointers directly into the immutable state store for + // performance (it's via the in-memory RPC mechanism). It's never safe + // to modify those values, so we short circuit here so that we never + // update any structures that are from our own datacenter. This works + // for address translation because we *never* need to translate local + // addresses, but this is super subtle, so we've piped all the in-place + // address translation into this function which makes sure this check is + // done. This also happens to skip looking at any of the incoming + // structure for the common case of not needing to translate, so it will + // skip a lot of work if no translation needs to be done. + if !config.TranslateWanAddrs || (config.Datacenter == dc) { + return + } + + // Translate addresses in-place, subject to the condition checked above + // which ensures this is safe to do since we are operating on a local + // copy of the data. + switch v := subj.(type) { + case structs.CheckServiceNodes: + for _, entry := range v { + entry.Node.Address = translateAddress(config, dc, + entry.Node.Address, entry.Node.TaggedAddresses) + } + case *structs.Node: + v.Address = translateAddress(config, dc, + v.Address, v.TaggedAddresses) + case structs.Nodes: + for _, node := range v { + node.Address = translateAddress(config, dc, + node.Address, node.TaggedAddresses) + } + case structs.ServiceNodes: + for _, entry := range v { + entry.Address = translateAddress(config, dc, + entry.Address, entry.TaggedAddresses) + } + default: + panic(fmt.Errorf("Unhandled type passed to address translator: %#v", subj)) + + } +} From 55e83c9e1ce430f9f1ad9e45a4b24b8b014f3790 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 15 Aug 2016 15:34:11 -0700 Subject: [PATCH 4/5] Tweaks the WAN address translation unit tests. --- command/agent/catalog_endpoint_test.go | 180 ++++++++++++------ command/agent/health_endpoint_test.go | 57 ++++-- command/agent/http_test.go | 44 ----- command/agent/prepared_query_endpoint_test.go | 62 +++++- 4 files changed, 218 insertions(+), 125 deletions(-) diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index ab78306b2d..1d1136e1f6 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -146,13 +146,41 @@ func TestCatalogNodes(t *testing.T) { } func TestCatalogNodes_WanTranslation(t *testing.T) { - httpCtx1, httpCtx2 := setupWanHTTPServers(t) - defer shutdownHTTPServer(httpCtx1) - defer shutdownHTTPServer(httpCtx2) - srv1 := httpCtx1.srv - srv2 := httpCtx2.srv + dir1, srv1 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc1" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir1) + defer srv1.Shutdown() + defer srv1.agent.Shutdown() + testutil.WaitForLeader(t, srv1.agent.RPC, "dc1") - // Register a node with DC2 + dir2, srv2 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc2" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir2) + defer srv2.Shutdown() + defer srv2.agent.Shutdown() + testutil.WaitForLeader(t, srv2.agent.RPC, "dc2") + + // Wait for the WAN join. + addr := fmt.Sprintf("127.0.0.1:%d", + srv1.agent.config.Ports.SerfWan) + if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult( + func() (bool, error) { + return len(srv1.agent.WANMembers()) > 1, nil + }, + func(err error) { + t.Fatalf("Failed waiting for WAN join: %v", err) + }) + + // Register a node with DC2. { args := &structs.RegisterRequest{ Datacenter: "dc2", @@ -172,65 +200,54 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { } } + // Query nodes in DC2 from DC1. req, err := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc2", nil) if err != nil { t.Fatalf("err: %v", err) } - // get nodes for DC2 from DC1 resp1 := httptest.NewRecorder() obj1, err1 := srv1.CatalogNodes(resp1, req) if err1 != nil { t.Fatalf("err: %v", err1) } - - // Verify an index is set assertIndex(t, resp1) + // Expect that DC1 gives us a WAN address (since the node is in DC2). nodes1 := obj1.(structs.Nodes) if len(nodes1) != 2 { t.Fatalf("bad: %v", obj1) } - - var node1 *structs.Node - + var address string for _, node := range nodes1 { if node.Node == "wan_translation_test" { - node1 = node + address = node.Address } } - - // Expect that DC1 gives us a public address (since the node is in DC2) - if node1.Address != "127.0.0.2" { - t.Fatalf("bad: %v", node1) + if address != "127.0.0.2" { + t.Fatalf("bad: %s", address) } - // get nodes for DC2 from DC2 + // Query DC2 from DC2. resp2 := httptest.NewRecorder() obj2, err2 := srv2.CatalogNodes(resp2, req) if err2 != nil { t.Fatalf("err: %v", err2) } - - // Verify an index is set assertIndex(t, resp2) + // Expect that DC2 gives us a private address (since the node is in DC2). nodes2 := obj2.(structs.Nodes) if len(nodes2) != 2 { t.Fatalf("bad: %v", obj2) } - - var node2 *structs.Node - for _, node := range nodes2 { if node.Node == "wan_translation_test" { - node2 = node + address = node.Address } } - - // Expect that DC2 gives us a private address (since the node is in DC2) - if node2.Address != "127.0.0.1" { - t.Fatalf("bad: %v", node2) + if address != "127.0.0.1" { + t.Fatalf("bad: %s", address) } } @@ -497,13 +514,41 @@ func TestCatalogServiceNodes(t *testing.T) { } func TestCatalogServiceNodes_WanTranslation(t *testing.T) { - httpCtx1, httpCtx2 := setupWanHTTPServers(t) - defer shutdownHTTPServer(httpCtx1) - defer shutdownHTTPServer(httpCtx2) - srv1 := httpCtx1.srv - srv2 := httpCtx2.srv + dir1, srv1 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc1" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir1) + defer srv1.Shutdown() + defer srv1.agent.Shutdown() + testutil.WaitForLeader(t, srv1.agent.RPC, "dc1") - // Register a node with DC2 + dir2, srv2 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc2" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir2) + defer srv2.Shutdown() + defer srv2.agent.Shutdown() + testutil.WaitForLeader(t, srv2.agent.RPC, "dc2") + + // Wait for the WAN join. + addr := fmt.Sprintf("127.0.0.1:%d", + srv1.agent.config.Ports.SerfWan) + if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult( + func() (bool, error) { + return len(srv1.agent.WANMembers()) > 1, nil + }, + func(err error) { + t.Fatalf("Failed waiting for WAN join: %v", err) + }) + + // Register a node with DC2. { args := &structs.RegisterRequest{ Datacenter: "dc2", @@ -523,49 +568,43 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) { } } + // Query for the node in DC2 from DC1. req, err := http.NewRequest("GET", "/v1/catalog/service/http_wan_translation_test?dc=dc2", nil) if err != nil { t.Fatalf("err: %v", err) } - // Ask HTTP server on DC1 for the node resp1 := httptest.NewRecorder() obj1, err1 := srv1.CatalogServiceNodes(resp1, req) if err1 != nil { t.Fatalf("err: %v", err1) } - assertIndex(t, resp1) + // Expect that DC1 gives us a WAN address (since the node is in DC2). nodes1 := obj1.(structs.ServiceNodes) if len(nodes1) != 1 { t.Fatalf("bad: %v", obj1) } - node1 := nodes1[0] - - // Expect that DC1 gives us a public address (since the node is in DC2) if node1.Address != "127.0.0.2" { t.Fatalf("bad: %v", node1) } - // Ask HTTP server on DC2 for the node + // Query DC2 from DC2. resp2 := httptest.NewRecorder() obj2, err2 := srv2.CatalogServiceNodes(resp2, req) if err2 != nil { t.Fatalf("err: %v", err2) } - assertIndex(t, resp2) + // Expect that DC2 gives us a local address (since the node is in DC2). nodes2 := obj2.(structs.ServiceNodes) if len(nodes2) != 1 { t.Fatalf("bad: %v", obj2) } - node2 := nodes2[0] - - // Expect that DC2 gives us a local address (since the node is in DC2) if node2.Address != "127.0.0.1" { t.Fatalf("bad: %v", node2) } @@ -716,13 +755,41 @@ func TestCatalogNodeServices(t *testing.T) { } func TestCatalogNodeServices_WanTranslation(t *testing.T) { - httpCtx1, httpCtx2 := setupWanHTTPServers(t) - defer shutdownHTTPServer(httpCtx1) - defer shutdownHTTPServer(httpCtx2) - srv1 := httpCtx1.srv - srv2 := httpCtx2.srv + dir1, srv1 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc1" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir1) + defer srv1.Shutdown() + defer srv1.agent.Shutdown() + testutil.WaitForLeader(t, srv1.agent.RPC, "dc1") - // Register a node with DC2 + dir2, srv2 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc2" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir2) + defer srv2.Shutdown() + defer srv2.agent.Shutdown() + testutil.WaitForLeader(t, srv2.agent.RPC, "dc2") + + // Wait for the WAN join. + addr := fmt.Sprintf("127.0.0.1:%d", + srv1.agent.config.Ports.SerfWan) + if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult( + func() (bool, error) { + return len(srv1.agent.WANMembers()) > 1, nil + }, + func(err error) { + t.Fatalf("Failed waiting for WAN join: %v", err) + }) + + // Register a node with DC2. { args := &structs.RegisterRequest{ Datacenter: "dc2", @@ -742,12 +809,11 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) { } } + // Query for the node in DC2 from DC1. req, err := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc2", nil) if err != nil { t.Fatalf("err: %v", err) } - - // ask DC1 for node in DC2 resp1 := httptest.NewRecorder() obj1, err1 := srv1.CatalogNodeServices(resp1, req) if err1 != nil { @@ -755,19 +821,17 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) { } assertIndex(t, resp1) + // Expect that DC1 gives us a WAN address (since the node is in DC2). services1 := obj1.(*structs.NodeServices) if len(services1.Services) != 1 { t.Fatalf("bad: %v", obj1) } - service1 := services1.Node - - // Expect that DC1 gives us a public address (since the node is in DC2) if service1.Address != "127.0.0.2" { t.Fatalf("bad: %v", service1) } - // ask DC2 for node in DC2 + // Query DC2 from DC2. resp2 := httptest.NewRecorder() obj2, err2 := srv2.CatalogNodeServices(resp2, req) if err2 != nil { @@ -775,14 +839,12 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) { } assertIndex(t, resp2) + // Expect that DC2 gives us a private address (since the node is in DC2). services2 := obj2.(*structs.NodeServices) if len(services2.Services) != 1 { t.Fatalf("bad: %v", obj2) } - service2 := services2.Node - - // Expect that DC2 gives us a private address (since the node is in DC2) if service2.Address != "127.0.0.1" { t.Fatalf("bad: %v", service2) } diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index 584a0ae2e6..47750b520c 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -555,13 +555,41 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) { } func TestHealthServiceNodes_WanTranslation(t *testing.T) { - httpCtx1, httpCtx2 := setupWanHTTPServers(t) - defer shutdownHTTPServer(httpCtx1) - defer shutdownHTTPServer(httpCtx2) - srv1 := httpCtx1.srv - srv2 := httpCtx2.srv + dir1, srv1 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc1" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir1) + defer srv1.Shutdown() + defer srv1.agent.Shutdown() + testutil.WaitForLeader(t, srv1.agent.RPC, "dc1") - // Register a node with DC2 + dir2, srv2 := makeHTTPServerWithConfig(t, + func(c *Config) { + c.Datacenter = "dc2" + c.TranslateWanAddrs = true + }) + defer os.RemoveAll(dir2) + defer srv2.Shutdown() + defer srv2.agent.Shutdown() + testutil.WaitForLeader(t, srv2.agent.RPC, "dc2") + + // Wait for the WAN join. + addr := fmt.Sprintf("127.0.0.1:%d", + srv1.agent.config.Ports.SerfWan) + if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult( + func() (bool, error) { + return len(srv1.agent.WANMembers()) > 1, nil + }, + func(err error) { + t.Fatalf("Failed waiting for WAN join: %v", err) + }) + + // Register a node with DC2. { args := &structs.RegisterRequest{ Datacenter: "dc2", @@ -581,51 +609,42 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) { } } + // Query for a service in DC2 from DC1. req, err := http.NewRequest("GET", "/v1/health/service/http_wan_translation_test?dc=dc2", nil) if err != nil { t.Fatalf("err: %v", err) } - - // ask DC1 for node in DC2 resp1 := httptest.NewRecorder() obj1, err1 := srv1.HealthServiceNodes(resp1, req) if err1 != nil { t.Fatalf("err: %v", err1) } - assertIndex(t, resp1) - // Should be 1 health check for consul + // Expect that DC1 gives us a WAN address (since the node is in DC2). nodes1 := obj1.(structs.CheckServiceNodes) if len(nodes1) != 1 { t.Fatalf("bad: %v", obj1) } - node1 := nodes1[0].Node - - // Expect that DC1 gives us a public address (since the node is in DC2) if node1.Address != "127.0.0.2" { t.Fatalf("bad: %v", node1) } - // ask DC2 for node in DC2 + // Query DC2 from DC2. resp2 := httptest.NewRecorder() obj2, err2 := srv2.HealthServiceNodes(resp2, req) if err2 != nil { t.Fatalf("err: %v", err2) } - assertIndex(t, resp2) - // Should be 1 health check for consul + // Expect that DC2 gives us a private address (since the node is in DC2). nodes2 := obj2.(structs.CheckServiceNodes) if len(nodes2) != 1 { t.Fatalf("bad: %v", obj2) } - node2 := nodes2[0].Node - - // Expect that DC2 gives us a private address (since the node is in DC2) if node2.Address != "127.0.0.1" { t.Fatalf("bad: %v", node2) } diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 7352c6186c..b6618977f3 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -23,50 +23,6 @@ import ( "github.com/hashicorp/go-cleanhttp" ) -type HTTPServerCtx struct { - dir string - srv *HTTPServer -} - -func setupWanHTTPServers(t *testing.T) (HTTPServerCtx, HTTPServerCtx) { - dir1, srv1 := makeHTTPServerWithConfig(t, - func(c *Config) { - c.Datacenter = "dc1" - c.TranslateWanAddrs = true - }) - - dir2, srv2 := makeHTTPServerWithConfig(t, - func(c *Config) { - c.Datacenter = "dc2" - c.TranslateWanAddrs = true - }) - - testutil.WaitForLeader(t, srv1.agent.RPC, "dc1") - testutil.WaitForLeader(t, srv2.agent.RPC, "dc2") - - // Join WAN cluster - addr := fmt.Sprintf("127.0.0.1:%d", - srv1.agent.config.Ports.SerfWan) - if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil { - t.Fatalf("err: %v", err) - } - - testutil.WaitForResult( - func() (bool, error) { - return len(srv1.agent.WANMembers()) > 1, nil - }, - func(err error) { - t.Fatalf("Failed waiting for WAN join: %v", err) - }) - return HTTPServerCtx{dir1, srv1}, HTTPServerCtx{dir2, srv2} -} - -func shutdownHTTPServer(httpCtx HTTPServerCtx) { - os.RemoveAll(httpCtx.dir) - httpCtx.srv.Shutdown() - httpCtx.srv.agent.Shutdown() -} - func makeHTTPServer(t *testing.T) (string, *HTTPServer) { return makeHTTPServerWithConfig(t, nil) } diff --git a/command/agent/prepared_query_endpoint_test.go b/command/agent/prepared_query_endpoint_test.go index ceac2f4408..0a7927f466 100644 --- a/command/agent/prepared_query_endpoint_test.go +++ b/command/agent/prepared_query_endpoint_test.go @@ -359,7 +359,7 @@ func TestPreparedQuery_Execute(t *testing.T) { } }) - // testing WAN translation in the response + // Ensure WAN translation occurs for a response outside of the local DC. httpTestWithConfig(t, func(srv *HTTPServer) { m := MockPreparedQuery{} if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil { @@ -368,9 +368,14 @@ func TestPreparedQuery_Execute(t *testing.T) { m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { nodesResponse := make(structs.CheckServiceNodes, 1) - nodesResponse[0].Node = &structs.Node{Node: "foo", Address: "127.0.0.1", - TaggedAddresses: map[string]string{"wan": "127.0.0.2"}} + nodesResponse[0].Node = &structs.Node{ + Node: "foo", Address: "127.0.0.1", + TaggedAddresses: map[string]string{ + "wan": "127.0.0.2", + }, + } reply.Nodes = nodesResponse + reply.Datacenter = "dc2" return nil } @@ -405,6 +410,57 @@ func TestPreparedQuery_Execute(t *testing.T) { c.TranslateWanAddrs = true }) + // Ensure WAN translation doesn't occur for the local DC. + httpTestWithConfig(t, func(srv *HTTPServer) { + m := MockPreparedQuery{} + if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil { + t.Fatalf("err: %v", err) + } + + m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { + nodesResponse := make(structs.CheckServiceNodes, 1) + nodesResponse[0].Node = &structs.Node{ + Node: "foo", Address: "127.0.0.1", + TaggedAddresses: map[string]string{ + "wan": "127.0.0.2", + }, + } + reply.Nodes = nodesResponse + reply.Datacenter = "dc1" + return nil + } + + body := bytes.NewBuffer(nil) + req, err := http.NewRequest("GET", "/v1/query/my-id/execute?dc=dc2", body) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.PreparedQuerySpecific(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 200 { + t.Fatalf("bad code: %d", resp.Code) + } + r, ok := obj.(structs.PreparedQueryExecuteResponse) + if !ok { + t.Fatalf("unexpected: %T", obj) + } + if r.Nodes == nil || len(r.Nodes) != 1 { + t.Fatalf("bad: %v", r) + } + + node := r.Nodes[0] + if node.Node.Address != "127.0.0.1" { + t.Fatalf("bad: %v", node.Node) + } + }, func(c *Config) { + c.Datacenter = "dc1" + c.TranslateWanAddrs = true + }) + httpTest(t, func(srv *HTTPServer) { body := bytes.NewBuffer(nil) req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body) From b8fa011972bd5a9f35e5b0fec48302619972c0a5 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 15 Aug 2016 15:47:15 -0700 Subject: [PATCH 5/5] Updates docs for WAN address translation and tweaks some nearby unrelated docs. --- .../source/docs/agent/options.html.markdown | 35 +++++++++++++------ .../docs/upgrade-specific.html.markdown | 7 ++++ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index ff5e017e4c..7428d5d80d 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -750,10 +750,20 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `translate_wan_addrs` If set to true, Consul will prefer a node's configured WAN address - when servicing DNS requests for a node in a remote datacenter. This allows the node to be - reached within its own datacenter using its local address, and reached from other datacenters - using its WAN address, which is useful in hybrid setups with mixed networks. This is disabled - by default. + when servicing DNS and HTTP requests for a node in a remote datacenter. This allows the node to + be reached within its own datacenter using its local address, and reached from other datacenters + using its WAN address, which is useful in hybrid setups with mixed networks. This is disabled by + default. +
+
+ In addition to addresses in DNS responses, Consul will also translate node addresses in responses + to the following HTTP endpoints when querying a remote datacenter: +
+ * [`/v1/catalog/nodes`](/docs/agent/http/catalog.html#catalog_nodes) + * [`/v1/catalog/node/`](/docs/agent/http/catalog.html#catalog_node) + * [`/v1/catalog/service/`](/docs/agent/http/catalog.html#catalog_service) + * [`/v1/health/service/`](/docs/agent/http/health.html#health_service) + * [`/v1/query//execute`](/docs/agent/http/query.html#execute) * `ui` - Equivalent to the [`-ui`](#_ui) command-line flag. @@ -764,20 +774,23 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass * `unix_sockets` - This allows tuning the ownership and permissions of the Unix domain socket files created by Consul. Domain sockets are only used if - the HTTP or RPC addresses are configured with the `unix://` prefix. The - following options are valid within this construct and apply globally to all - sockets created by Consul: + the HTTP or RPC addresses are configured with the `unix://` prefix.
- * `user` - The name or ID of the user who will own the socket file. - * `group` - The group ID ownership of the socket file. Note that this option - currently only supports numeric IDs. - * `mode` - The permission bits to set on the file.
It is important to note that this option may have different effects on different operating systems. Linux generally observes socket file permissions while many BSD variants ignore permissions on the socket file itself. It is important to test this feature on your specific distribution. This feature is currently not functional on Windows hosts. +
+
+ The following options are valid within this construct and apply globally to all + sockets created by Consul: +
+ * `user` - The name or ID of the user who will own the socket file. + * `group` - The group ID ownership of the socket file. Note that this option + currently only supports numeric IDs. + * `mode` - The permission bits to set on the file. * `verify_incoming` - If set to true, Consul requires that all incoming diff --git a/website/source/docs/upgrade-specific.html.markdown b/website/source/docs/upgrade-specific.html.markdown index d575906be6..7a1b09414e 100644 --- a/website/source/docs/upgrade-specific.html.markdown +++ b/website/source/docs/upgrade-specific.html.markdown @@ -38,6 +38,13 @@ making them unable to properly serve requests for prepared queries using the feature. It is recommended that all agents be running version 0.7.0 prior to using this feature. +#### WAN Address Translation in HTTP Endpoints + +Consul version 0.7 added support for translating WAN addresses in certain +[HTTP endpoints](/docs/agent/options.html#translate_wan_addrs). The servers +and the agents need to be running version 0.7.0 or later in order to use this +feature. + ## Consul 0.6.4 Consul 0.6.4 made some substantial changes to how ACLs work with prepared