consul: use source parameter for near prepared queries

This commit is contained in:
Ryan Uber 2016-06-30 12:11:20 -07:00
parent c457ee0075
commit 0c2ad07fa9
4 changed files with 175 additions and 139 deletions

View File

@ -369,40 +369,21 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// requested an RTT sort. // requested an RTT sort.
reply.Nodes.Shuffle() reply.Nodes.Shuffle()
// Get the source to sort by. This can be passed in by the requestor, or // Check if the query carries a Near parameter, or if the requestor
// pre-defined using the Near parameter in the prepared query. If the // supplied a ?near parameter in the request. We can apply distance
// near parameter was defined, that will be preferred. // sorting if either of these cases are true, but we don't want to
sortFrom := args.Source // affect the established round-robin default.
if query.Service.Near != "" { if args.Source.NearRequested || query.Service.Near != "" {
sortFrom = structs.QuerySource{ // Apply the "near" parameter if it exists on the prepared query and
Datacenter: args.Datacenter, // was not provided in the request args.
Node: query.Service.Near, if !args.Source.NearRequested && query.Service.Near != "_agent" {
args.Source.Node = query.Service.Near
} }
}
// Respect the magic "_agent" flag. // Perform the distance sort
preferLocal := false err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
if sortFrom.Node == "_agent" { if err != nil {
preferLocal = true return err
sortFrom = args.Origin
}
// Perform the distance sort
if err := p.srv.sortNodesByDistanceFrom(sortFrom, reply.Nodes); err != nil {
return err
}
// Nodes cannot be any "closer" than localhost, so this special case ensures
// the local node is returned first if it is present in the result. This
// allows the local agent to be preferred even when network coordinates are
// not enabled. Only works if the results come from the request origin DC.
if preferLocal && reply.Datacenter == args.Origin.Datacenter {
for i, node := range reply.Nodes {
if node.Node.Node == args.Origin.Node {
remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...)
reply.Nodes = append([]structs.CheckServiceNode{node}, remote...)
break
}
} }
} }

View File

@ -1548,8 +1548,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
QueryIDOrName: query.Query.ID, QueryIDOrName: query.Query.ID,
Source: structs.QuerySource{ Source: structs.QuerySource{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node3", Node: "node3",
NearRequested: true,
}, },
QueryOptions: structs.QueryOptions{Token: execToken}, QueryOptions: structs.QueryOptions{Token: execToken},
} }
@ -1607,110 +1608,22 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques)) t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
} }
// Set the query to prefer a colocated service using the magic _agent token // Set the query to return results nearest to node3. This is the only
// node with coordinates, and it carries the service we are asking for,
// so node3 should always show up first.
query.Op = structs.PreparedQueryUpdate query.Op = structs.PreparedQueryUpdate
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}
// Now try querying and make sure the local node is preferred.
{
req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: "dc1",
Node: "node1",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
// Repeat this a few times to make sure we don't just get lucky.
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node1" {
t.Fatalf("expect node1 first, got: %q", node)
}
}
}
// Falls back to remote nodes if service is not available locally
{
req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: "dc1",
Node: "node1",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
}
// Shuffles if the response comes from a non-local DC. We may
// need to try multiple times if at first we get a match by chance.
{
req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{
Datacenter: "dc2",
Node: "node1",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if reply.Nodes[0].Node.Node != "node1" {
shuffled = true
break
}
}
if !shuffled {
t.Fatal("expect node shuffle for remote results")
}
}
// Bake a non-local node name into Near parameter of the query. This
// node was seeded with a coordinate above so distance sort works.
query.Query.Service.Near = "node3" query.Query.Service.Near = "node3"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Try the distance sort again to ensure the nearest node is returned // Now run the query and make sure the baked-in service is returned.
{ {
req := structs.PreparedQueryExecuteRequest{ req := structs.PreparedQueryExecuteRequest{
Origin: structs.QuerySource{ Source: structs.QuerySource{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "foo",
NearRequested: false,
}, },
Datacenter: "dc1", Datacenter: "dc1",
QueryIDOrName: query.Query.ID, QueryIDOrName: query.Query.ID,
@ -1727,15 +1640,158 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("expect 10 nodes, got: %d", n) t.Fatalf("expect 10 nodes, got: %d", n)
} }
if node := reply.Nodes[0].Node.Node; node != "node3" { if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3, got: %q", node) t.Fatalf("expect node3 first, got: %q", node)
} }
} }
} }
// Un-bake the Near parameter. // Query again, but this time set NearRequested to "true". This should
// prove that we allow overriding the baked-in value with ?near.
{
// Set up the query with a non-existent node. This will cause the
// nodes to be shuffled if the passed node is respected, proving
// that we allow the override to happen.
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "foo",
NearRequested: true,
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}
if !shuffled {
t.Fatalf("expect nodes to be shuffled")
}
}
// Check that if NearRequested is passed as true, that we sort based
// on the given node and do not use the one stored in the PQ.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "node1",
NearRequested: true,
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
// We just want to check that we get a non-local node, because
// the local node is the only one with working coordinates.
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
shuffled = true
break
}
}
if !shuffled {
t.Fatal("expect non-local results")
}
}
// Set the query to prefer a colocated service using the magic _agent token
query.Query.Service.Near = "_agent"
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
}
// Check that the node returned is the one we asked for in the
// query source. This proves that if the PQ has "_agent" baked
// in, we always use the passed-in node.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if node := reply.Nodes[0].Node.Node; node != "node3" {
t.Fatalf("expect node3 first, got: %q", node)
}
}
}
// Shuffles if the response comes from a non-local DC. Proves that the
// near parameter does not affect this order.
{
req := structs.PreparedQueryExecuteRequest{
Source: structs.QuerySource{
Datacenter: "dc2",
Node: "node3",
},
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
shuffled := false
for i := 0; i < 10; i++ {
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if n := len(reply.Nodes); n != 10 {
t.Fatalf("expect 10 nodes, got: %d", n)
}
if reply.Nodes[0].Node.Node != "node3" {
shuffled = true
break
}
}
if !shuffled {
t.Fatal("expect node shuffle for remote results")
}
}
// Un-bake the near parameter.
query.Query.Service.Near = "" query.Query.Service.Near = ""
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil { if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err:% v", err) t.Fatalf("err: %v", err)
} }
// Update the health of a node to mark it critical. // Update the health of a node to mark it critical.

View File

@ -178,12 +178,6 @@ type PreparedQueryExecuteRequest struct {
// Limit will trim the resulting list down to the given limit. // Limit will trim the resulting list down to the given limit.
Limit int Limit int
// Origin is used to carry around a reference to the node which
// is executing the query on behalf of the client. It is taken
// as a QuerySource so that it can be used directly for queries
// relating to the agent servicing the request.
Origin QuerySource
// Source is used to sort the results relative to a given node using // Source is used to sort the results relative to a given node using
// network coordinates. // network coordinates.
Source QuerySource Source QuerySource

View File

@ -196,6 +196,11 @@ func (r *DeregisterRequest) RequestDatacenter() string {
type QuerySource struct { type QuerySource struct {
Datacenter string Datacenter string
Node string Node string
// NearRequested indicates where the values in this QuerySource came
// from. When true, the values were provided by the requestor,
// otherwise they were filled by the agent servicing the request.
NearRequested bool
} }
// DCSpecificRequest is used to query about a specific DC // DCSpecificRequest is used to query about a specific DC