From d567d6a6d808e927965660ec0b2e43b7a7fbf4cd Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 21 Jun 2016 15:34:26 -0700 Subject: [PATCH] consul: send origin node + dc when executing prepared queries --- command/agent/dns.go | 5 +- command/agent/prepared_query_endpoint.go | 5 +- command/agent/prepared_query_endpoint_test.go | 10 +++- consul/prepared_query/walk_test.go | 2 + consul/prepared_query_endpoint.go | 8 +-- consul/prepared_query_endpoint_test.go | 49 +++++++++++++++++-- consul/structs/prepared_query.go | 6 ++- 7 files changed, 72 insertions(+), 13 deletions(-) diff --git a/command/agent/dns.go b/command/agent/dns.go index ea23d088d8..28b8fee905 100644 --- a/command/agent/dns.go +++ b/command/agent/dns.go @@ -592,7 +592,10 @@ RPC: func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req, resp *dns.Msg) { // Execute the prepared query. args := structs.PreparedQueryExecuteRequest{ - Origin: d.agent.config.NodeName, + Origin: structs.QuerySource{ + Datacenter: d.agent.config.Datacenter, + Node: d.agent.config.NodeName, + }, Datacenter: datacenter, QueryIDOrName: query, QueryOptions: structs.QueryOptions{ diff --git a/command/agent/prepared_query_endpoint.go b/command/agent/prepared_query_endpoint.go index ad27933957..a70944b8af 100644 --- a/command/agent/prepared_query_endpoint.go +++ b/command/agent/prepared_query_endpoint.go @@ -95,7 +95,10 @@ func parseLimit(req *http.Request, limit *int) error { // preparedQueryExecute executes a prepared query. func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { args := structs.PreparedQueryExecuteRequest{ - Origin: s.agent.config.NodeName, + Origin: structs.QuerySource{ + Datacenter: s.agent.config.Datacenter, + Node: s.agent.config.NodeName, + }, QueryIDOrName: id, } s.parseSource(req, &args.Source) diff --git a/command/agent/prepared_query_endpoint_test.go b/command/agent/prepared_query_endpoint_test.go index 138124c69e..9c7e540f37 100644 --- a/command/agent/prepared_query_endpoint_test.go +++ b/command/agent/prepared_query_endpoint_test.go @@ -279,7 +279,10 @@ func TestPreparedQuery_Execute(t *testing.T) { m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { expected := &structs.PreparedQueryExecuteRequest{ - Origin: srv.agent.config.NodeName, + Origin: structs.QuerySource{ + Datacenter: srv.agent.config.Datacenter, + Node: srv.agent.config.NodeName, + }, Datacenter: "dc1", QueryIDOrName: "my-id", Limit: 5, @@ -351,7 +354,10 @@ func TestPreparedQuery_Explain(t *testing.T) { m.explainFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error { expected := &structs.PreparedQueryExecuteRequest{ - Origin: srv.agent.config.NodeName, + Origin: structs.QuerySource{ + Datacenter: srv.agent.config.Datacenter, + Node: srv.agent.config.NodeName, + }, Datacenter: "dc1", QueryIDOrName: "my-id", Limit: 5, diff --git a/consul/prepared_query/walk_test.go b/consul/prepared_query/walk_test.go index db9a75c1cb..05294e3b65 100644 --- a/consul/prepared_query/walk_test.go +++ b/consul/prepared_query/walk_test.go @@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) { Failover: structs.QueryDatacenterOptions{ Datacenters: []string{"dc1", "dc2"}, }, + Near: "_agent", Tags: []string{"tag1", "tag2", "tag3"}, } if err := walk(service, fn); err != nil { @@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) { ".Service:the-service", ".Failover.Datacenters[0]:dc1", ".Failover.Datacenters[1]:dc2", + ".Near:_agent", ".Tags[0]:tag1", ".Tags[1]:tag2", ".Tags[2]:tag3", diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index 6b41c907c0..92f5dff05c 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -384,7 +384,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, preferLocal := false if sortFrom.Node == "_agent" { preferLocal = true - sortFrom.Node = args.Origin + sortFrom = args.Origin } // Perform the distance sort @@ -395,10 +395,10 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // Nodes cannot be any "closer" than localhost, so this special case ensures // the local node is returned first if it is present in the result. This // allows the local agent to be preferred even when network coordinates are - // not enabled. - if preferLocal { + // not enabled. Only works if the results come from the request origin DC. + if preferLocal && reply.Datacenter == args.Origin.Datacenter { for i, node := range reply.Nodes { - if node.Node.Node == args.Origin { + if node.Node.Node == args.Origin.Node { remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...) reply.Nodes = append([]structs.CheckServiceNode{node}, remote...) break diff --git a/consul/prepared_query_endpoint_test.go b/consul/prepared_query_endpoint_test.go index 86b0303896..75d3905bc6 100644 --- a/consul/prepared_query_endpoint_test.go +++ b/consul/prepared_query_endpoint_test.go @@ -1617,7 +1617,10 @@ func TestPreparedQuery_Execute(t *testing.T) { // Now try querying and make sure the local node is preferred. { req := structs.PreparedQueryExecuteRequest{ - Origin: "node1", + Origin: structs.QuerySource{ + Datacenter: "dc1", + Node: "node1", + }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, QueryOptions: structs.QueryOptions{Token: execToken}, @@ -1642,7 +1645,10 @@ func TestPreparedQuery_Execute(t *testing.T) { // Falls back to remote nodes if service is not available locally { req := structs.PreparedQueryExecuteRequest{ - Origin: "not-in-result", + Origin: structs.QuerySource{ + Datacenter: "dc1", + Node: "node1", + }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, QueryOptions: structs.QueryOptions{Token: execToken}, @@ -1658,6 +1664,40 @@ func TestPreparedQuery_Execute(t *testing.T) { } } + // Shuffles if the response comes from a non-local DC. We may + // need to try multiple times if at first we get a match by chance. + { + req := structs.PreparedQueryExecuteRequest{ + Origin: structs.QuerySource{ + Datacenter: "dc2", + Node: "node1", + }, + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + QueryOptions: structs.QueryOptions{Token: execToken}, + } + + var reply structs.PreparedQueryExecuteResponse + + shuffled := false + for i := 0; i < 10; i++ { + if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if n := len(reply.Nodes); n != 10 { + t.Fatalf("expect 10 nodes, got: %d", n) + } + if reply.Nodes[0].Node.Node != "node1" { + shuffled = true + break + } + } + + if !shuffled { + t.Fatal("expect node shuffle for remote results") + } + } + // Bake a non-local node name into Near parameter of the query. This // node was seeded with a coordinate above so distance sort works. query.Query.Service.Near = "node3" @@ -1668,7 +1708,10 @@ func TestPreparedQuery_Execute(t *testing.T) { // Try the distance sort again to ensure the nearest node is returned { req := structs.PreparedQueryExecuteRequest{ - Origin: "node1", + Origin: structs.QuerySource{ + Datacenter: "dc1", + Node: "node1", + }, Datacenter: "dc1", QueryIDOrName: query.Query.ID, QueryOptions: structs.QueryOptions{Token: execToken}, diff --git a/consul/structs/prepared_query.go b/consul/structs/prepared_query.go index 49ad25a980..ef2896ca38 100644 --- a/consul/structs/prepared_query.go +++ b/consul/structs/prepared_query.go @@ -179,8 +179,10 @@ type PreparedQueryExecuteRequest struct { Limit int // Origin is used to carry around a reference to the node which - // is executing the query on behalf of the client. - Origin string + // is executing the query on behalf of the client. It is taken + // as a QuerySource so that it can be used directly for queries + // relating to the agent servicing the request. + Origin QuerySource // Source is used to sort the results relative to a given node using // network coordinates.