mirror of https://github.com/status-im/consul.git
consul: send origin node + dc when executing prepared queries
This commit is contained in:
parent
03fea4b091
commit
d567d6a6d8
|
@ -592,7 +592,10 @@ RPC:
|
||||||
func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req, resp *dns.Msg) {
|
func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req, resp *dns.Msg) {
|
||||||
// Execute the prepared query.
|
// Execute the prepared query.
|
||||||
args := structs.PreparedQueryExecuteRequest{
|
args := structs.PreparedQueryExecuteRequest{
|
||||||
Origin: d.agent.config.NodeName,
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: d.agent.config.Datacenter,
|
||||||
|
Node: d.agent.config.NodeName,
|
||||||
|
},
|
||||||
Datacenter: datacenter,
|
Datacenter: datacenter,
|
||||||
QueryIDOrName: query,
|
QueryIDOrName: query,
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
|
|
|
@ -95,7 +95,10 @@ func parseLimit(req *http.Request, limit *int) error {
|
||||||
// preparedQueryExecute executes a prepared query.
|
// preparedQueryExecute executes a prepared query.
|
||||||
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
args := structs.PreparedQueryExecuteRequest{
|
args := structs.PreparedQueryExecuteRequest{
|
||||||
Origin: s.agent.config.NodeName,
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: s.agent.config.Datacenter,
|
||||||
|
Node: s.agent.config.NodeName,
|
||||||
|
},
|
||||||
QueryIDOrName: id,
|
QueryIDOrName: id,
|
||||||
}
|
}
|
||||||
s.parseSource(req, &args.Source)
|
s.parseSource(req, &args.Source)
|
||||||
|
|
|
@ -279,7 +279,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
|
|
||||||
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||||
expected := &structs.PreparedQueryExecuteRequest{
|
expected := &structs.PreparedQueryExecuteRequest{
|
||||||
Origin: srv.agent.config.NodeName,
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: srv.agent.config.Datacenter,
|
||||||
|
Node: srv.agent.config.NodeName,
|
||||||
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: "my-id",
|
QueryIDOrName: "my-id",
|
||||||
Limit: 5,
|
Limit: 5,
|
||||||
|
@ -351,7 +354,10 @@ func TestPreparedQuery_Explain(t *testing.T) {
|
||||||
|
|
||||||
m.explainFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error {
|
m.explainFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error {
|
||||||
expected := &structs.PreparedQueryExecuteRequest{
|
expected := &structs.PreparedQueryExecuteRequest{
|
||||||
Origin: srv.agent.config.NodeName,
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: srv.agent.config.Datacenter,
|
||||||
|
Node: srv.agent.config.NodeName,
|
||||||
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: "my-id",
|
QueryIDOrName: "my-id",
|
||||||
Limit: 5,
|
Limit: 5,
|
||||||
|
|
|
@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
||||||
Failover: structs.QueryDatacenterOptions{
|
Failover: structs.QueryDatacenterOptions{
|
||||||
Datacenters: []string{"dc1", "dc2"},
|
Datacenters: []string{"dc1", "dc2"},
|
||||||
},
|
},
|
||||||
|
Near: "_agent",
|
||||||
Tags: []string{"tag1", "tag2", "tag3"},
|
Tags: []string{"tag1", "tag2", "tag3"},
|
||||||
}
|
}
|
||||||
if err := walk(service, fn); err != nil {
|
if err := walk(service, fn); err != nil {
|
||||||
|
@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
||||||
".Service:the-service",
|
".Service:the-service",
|
||||||
".Failover.Datacenters[0]:dc1",
|
".Failover.Datacenters[0]:dc1",
|
||||||
".Failover.Datacenters[1]:dc2",
|
".Failover.Datacenters[1]:dc2",
|
||||||
|
".Near:_agent",
|
||||||
".Tags[0]:tag1",
|
".Tags[0]:tag1",
|
||||||
".Tags[1]:tag2",
|
".Tags[1]:tag2",
|
||||||
".Tags[2]:tag3",
|
".Tags[2]:tag3",
|
||||||
|
|
|
@ -384,7 +384,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
||||||
preferLocal := false
|
preferLocal := false
|
||||||
if sortFrom.Node == "_agent" {
|
if sortFrom.Node == "_agent" {
|
||||||
preferLocal = true
|
preferLocal = true
|
||||||
sortFrom.Node = args.Origin
|
sortFrom = args.Origin
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform the distance sort
|
// Perform the distance sort
|
||||||
|
@ -395,10 +395,10 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
||||||
// Nodes cannot be any "closer" than localhost, so this special case ensures
|
// Nodes cannot be any "closer" than localhost, so this special case ensures
|
||||||
// the local node is returned first if it is present in the result. This
|
// the local node is returned first if it is present in the result. This
|
||||||
// allows the local agent to be preferred even when network coordinates are
|
// allows the local agent to be preferred even when network coordinates are
|
||||||
// not enabled.
|
// not enabled. Only works if the results come from the request origin DC.
|
||||||
if preferLocal {
|
if preferLocal && reply.Datacenter == args.Origin.Datacenter {
|
||||||
for i, node := range reply.Nodes {
|
for i, node := range reply.Nodes {
|
||||||
if node.Node.Node == args.Origin {
|
if node.Node.Node == args.Origin.Node {
|
||||||
remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...)
|
remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...)
|
||||||
reply.Nodes = append([]structs.CheckServiceNode{node}, remote...)
|
reply.Nodes = append([]structs.CheckServiceNode{node}, remote...)
|
||||||
break
|
break
|
||||||
|
|
|
@ -1617,7 +1617,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
// Now try querying and make sure the local node is preferred.
|
// Now try querying and make sure the local node is preferred.
|
||||||
{
|
{
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
Origin: "node1",
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node1",
|
||||||
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: query.Query.ID,
|
QueryIDOrName: query.Query.ID,
|
||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
@ -1642,7 +1645,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
// Falls back to remote nodes if service is not available locally
|
// Falls back to remote nodes if service is not available locally
|
||||||
{
|
{
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
Origin: "not-in-result",
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node1",
|
||||||
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: query.Query.ID,
|
QueryIDOrName: query.Query.ID,
|
||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
@ -1658,6 +1664,40 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shuffles if the response comes from a non-local DC. We may
|
||||||
|
// need to try multiple times if at first we get a match by chance.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: "dc2",
|
||||||
|
Node: "node1",
|
||||||
|
},
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
|
||||||
|
shuffled := false
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if n := len(reply.Nodes); n != 10 {
|
||||||
|
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||||
|
}
|
||||||
|
if reply.Nodes[0].Node.Node != "node1" {
|
||||||
|
shuffled = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !shuffled {
|
||||||
|
t.Fatal("expect node shuffle for remote results")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Bake a non-local node name into Near parameter of the query. This
|
// Bake a non-local node name into Near parameter of the query. This
|
||||||
// node was seeded with a coordinate above so distance sort works.
|
// node was seeded with a coordinate above so distance sort works.
|
||||||
query.Query.Service.Near = "node3"
|
query.Query.Service.Near = "node3"
|
||||||
|
@ -1668,7 +1708,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
// Try the distance sort again to ensure the nearest node is returned
|
// Try the distance sort again to ensure the nearest node is returned
|
||||||
{
|
{
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
Origin: "node1",
|
Origin: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node1",
|
||||||
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: query.Query.ID,
|
QueryIDOrName: query.Query.ID,
|
||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
|
|
@ -179,8 +179,10 @@ type PreparedQueryExecuteRequest struct {
|
||||||
Limit int
|
Limit int
|
||||||
|
|
||||||
// Origin is used to carry around a reference to the node which
|
// Origin is used to carry around a reference to the node which
|
||||||
// is executing the query on behalf of the client.
|
// is executing the query on behalf of the client. It is taken
|
||||||
Origin string
|
// as a QuerySource so that it can be used directly for queries
|
||||||
|
// relating to the agent servicing the request.
|
||||||
|
Origin QuerySource
|
||||||
|
|
||||||
// Source is used to sort the results relative to a given node using
|
// Source is used to sort the results relative to a given node using
|
||||||
// network coordinates.
|
// network coordinates.
|
||||||
|
|
Loading…
Reference in New Issue