Merge pull request #4023 from hashicorp/f-near-ip

Add near=_ip support for prepared queries
This commit is contained in:
Matt Keeler 2018-04-12 12:10:48 -04:00 committed by GitHub
commit d926679278
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 475 additions and 9 deletions

View File

@ -393,6 +393,31 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// Respect the magic "_agent" flag.
if qs.Node == "_agent" {
qs.Node = args.Agent.Node
} else if qs.Node == "_ip" {
if args.Source.Ip != "" {
_, nodes, err := state.Nodes(nil)
if err != nil {
return err
}
for _, node := range nodes {
if args.Source.Ip == node.Address {
qs.Node = node.Node
break
}
}
} else {
p.srv.logger.Printf("[WARN] Prepared Query using near=_ip requires "+
"the source IP to be set but none was provided. No distance "+
"sorting will be done.")
}
// Either a source IP was given but we couldnt find the associated node
// or no source ip was given. In both cases we should wipe the Node value
if qs.Node == "_ip" {
qs.Node = ""
}
}
// Perform the distance sort

View File

@ -270,7 +270,7 @@ func (d *DNSServer) handleQuery(resp dns.ResponseWriter, req *dns.Msg) {
m.SetRcode(req, dns.RcodeNotImplemented)
default:
d.dispatch(network, req, m)
d.dispatch(network, resp.RemoteAddr(), req, m)
}
// Handle EDNS
@ -362,7 +362,7 @@ func (d *DNSServer) nameservers(edns bool) (ns []dns.RR, extra []dns.RR) {
}
// dispatch is used to parse a request and invoke the correct handler
func (d *DNSServer) dispatch(network string, req, resp *dns.Msg) {
func (d *DNSServer) dispatch(network string, remoteAddr net.Addr, req, resp *dns.Msg) {
// By default the query is in the default datacenter
datacenter := d.agent.config.Datacenter
@ -439,7 +439,7 @@ PARSE:
// Allow a "." in the query name, just join all the parts.
query := strings.Join(labels[:n-1], ".")
d.preparedQueryLookup(network, datacenter, query, req, resp)
d.preparedQueryLookup(network, datacenter, query, remoteAddr, req, resp)
case "addr":
if n != 2 {
@ -917,8 +917,25 @@ func (d *DNSServer) serviceLookup(network, datacenter, service, tag string, req,
}
}
func ednsSubnetForRequest(req *dns.Msg) (*dns.EDNS0_SUBNET) {
// IsEdns0 returns the EDNS RR if present or nil otherwise
edns := req.IsEdns0()
if edns == nil {
return nil
}
for _, o := range edns.Option {
if subnet, ok := o.(*dns.EDNS0_SUBNET); ok {
return subnet
}
}
return nil;
}
// preparedQueryLookup is used to handle a prepared query.
func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req, resp *dns.Msg) {
func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, remoteAddr net.Addr, req, resp *dns.Msg) {
// Execute the prepared query.
args := structs.PreparedQueryExecuteRequest{
Datacenter: datacenter,
@ -938,6 +955,21 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
Node: d.agent.config.NodeName,
},
}
subnet := ednsSubnetForRequest(req)
if subnet != nil {
args.Source.Ip = subnet.Address.String()
} else {
switch v := remoteAddr.(type) {
case *net.UDPAddr:
args.Source.Ip = v.IP.String()
case *net.TCPAddr:
args.Source.Ip = v.IP.String()
case *net.IPAddr:
args.Source.Ip = v.IP.String()
}
}
// TODO (slackpad) - What's a safe limit we can set here? It seems like
// with dup filtering done at this level we need to get everything to
@ -1194,7 +1226,7 @@ func (d *DNSServer) resolveCNAME(name string) []dns.RR {
resp := &dns.Msg{}
req.SetQuestion(name, dns.TypeANY)
d.dispatch("udp", req, resp)
d.dispatch("udp", nil, req, resp)
return resp.Answer
}

View File

@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
@ -16,6 +17,7 @@ import (
"github.com/hashicorp/consul/testutil/retry"
"github.com/miekg/dns"
"github.com/pascaldekloe/goe/verify"
"github.com/hashicorp/serf/coordinate"
)
const (
@ -1835,6 +1837,247 @@ func TestDNS_ServiceLookup_TagPeriod(t *testing.T) {
}
}
func TestDNS_PreparedQueryNearIPEDNS(t *testing.T) {
ipCoord := lib.GenerateCoordinate(1 * time.Millisecond)
serviceNodes := []struct{
name string
address string
coord *coordinate.Coordinate
}{
{"foo1", "198.18.0.1", lib.GenerateCoordinate(1 * time.Millisecond),},
{"foo2", "198.18.0.2", lib.GenerateCoordinate(10 * time.Millisecond),},
{"foo3", "198.18.0.3", lib.GenerateCoordinate(30 * time.Millisecond),},
}
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
added := 0
// Register nodes with a service
for _, cfg := range serviceNodes {
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: cfg.name,
Address: cfg.address,
Service: &structs.NodeService{
Service: "db",
Port: 12345,
},
}
var out struct{}
err := a.RPC("Catalog.Register", args, &out)
require.NoError(t, err)
// Send coordinate updates
coordArgs := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: cfg.name,
Coord: cfg.coord,
}
err = a.RPC("Coordinate.Update", &coordArgs, &out)
require.NoError(t, err)
added += 1
}
fmt.Printf("Added %d service nodes\n", added)
// Register a node without a service
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "198.18.0.9",
}
var out struct{}
err := a.RPC("Catalog.Register", args, &out)
require.NoError(t, err)
// Send coordinate updates for a few nodes.
coordArgs := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "bar",
Coord: ipCoord,
}
err = a.RPC("Coordinate.Update", &coordArgs, &out)
require.NoError(t, err)
}
// Register a prepared query Near = _ip
{
args := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "some.query.we.like",
Service: structs.ServiceQuery{
Service: "db",
Near: "_ip",
},
},
}
var id string
err := a.RPC("PreparedQuery.Apply", args, &id)
require.NoError(t, err)
}
retry.Run(t, func(r *retry.R) {
m := new(dns.Msg)
m.SetQuestion("some.query.we.like.query.consul.", dns.TypeA)
m.SetEdns0(4096, false)
o := new(dns.OPT)
o.Hdr.Name = "."
o.Hdr.Rrtype = dns.TypeOPT
e := new(dns.EDNS0_SUBNET)
e.Code = dns.EDNS0SUBNET
e.Family = 1
e.SourceNetmask = 32
e.SourceScope = 0
e.Address = net.ParseIP("198.18.0.9").To4()
o.Option = append(o.Option, e)
m.Extra = append(m.Extra, o)
c := new(dns.Client)
in, _, err := c.Exchange(m, a.DNSAddr())
if err != nil {
r.Fatalf("Error with call to dns.Client.Exchange: %s", err)
}
if len(serviceNodes) != len(in.Answer) {
r.Fatalf("Expecting %d A RRs in response, Actual found was %d", len(serviceNodes), len(in.Answer))
}
for i, rr := range in.Answer {
if aRec, ok := rr.(*dns.A); ok {
if actual := aRec.A.String(); serviceNodes[i].address != actual {
r.Fatalf("Expecting A RR #%d = %s, Actual RR was %s", i, serviceNodes[i].address, actual)
}
} else {
r.Fatalf("DNS Answer contained a non-A RR")
}
}
})
}
func TestDNS_PreparedQueryNearIP(t *testing.T) {
ipCoord := lib.GenerateCoordinate(1 * time.Millisecond)
serviceNodes := []struct{
name string
address string
coord *coordinate.Coordinate
}{
{"foo1", "198.18.0.1", lib.GenerateCoordinate(1 * time.Millisecond),},
{"foo2", "198.18.0.2", lib.GenerateCoordinate(10 * time.Millisecond),},
{"foo3", "198.18.0.3", lib.GenerateCoordinate(30 * time.Millisecond),},
}
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
added := 0
// Register nodes with a service
for _, cfg := range serviceNodes {
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: cfg.name,
Address: cfg.address,
Service: &structs.NodeService{
Service: "db",
Port: 12345,
},
}
var out struct{}
err := a.RPC("Catalog.Register", args, &out)
require.NoError(t, err)
// Send coordinate updates
coordArgs := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: cfg.name,
Coord: cfg.coord,
}
err = a.RPC("Coordinate.Update", &coordArgs, &out)
require.NoError(t, err)
added += 1
}
fmt.Printf("Added %d service nodes\n", added)
// Register a node without a service
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "198.18.0.9",
}
var out struct{}
err := a.RPC("Catalog.Register", args, &out)
require.NoError(t, err)
// Send coordinate updates for a few nodes.
coordArgs := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "bar",
Coord: ipCoord,
}
err = a.RPC("Coordinate.Update", &coordArgs, &out)
require.NoError(t, err)
}
// Register a prepared query Near = _ip
{
args := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "some.query.we.like",
Service: structs.ServiceQuery{
Service: "db",
Near: "_ip",
},
},
}
var id string
err := a.RPC("PreparedQuery.Apply", args, &id)
require.NoError(t, err)
}
retry.Run(t, func(r *retry.R) {
m := new(dns.Msg)
m.SetQuestion("some.query.we.like.query.consul.", dns.TypeA)
c := new(dns.Client)
in, _, err := c.Exchange(m, a.DNSAddr())
if err != nil {
r.Fatalf("Error with call to dns.Client.Exchange: %s", err)
}
if len(serviceNodes) != len(in.Answer) {
r.Fatalf("Expecting %d A RRs in response, Actual found was %d", len(serviceNodes), len(in.Answer))
}
for i, rr := range in.Answer {
if aRec, ok := rr.(*dns.A); ok {
if actual := aRec.A.String(); serviceNodes[i].address != actual {
r.Fatalf("Expecting A RR #%d = %s, Actual RR was %s", i, serviceNodes[i].address, actual)
}
} else {
r.Fatalf("DNS Answer contained a non-A RR")
}
}
})
}
func TestDNS_ServiceLookup_PreparedQueryNamePeriod(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")

View File

@ -498,11 +498,36 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
*token = s.agent.tokens.UserToken()
}
func sourceAddrFromRequest(req *http.Request) string {
xff := req.Header.Get("X-Forwarded-For")
forwardHosts := strings.Split(xff, ",")
if len(forwardHosts) > 0 {
forwardIp := net.ParseIP(strings.TrimSpace(forwardHosts[0]))
if forwardIp != nil {
return forwardIp.String()
}
}
host, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return ""
}
ip := net.ParseIP(host)
if ip != nil {
return ip.String()
} else {
return ""
}
}
// parseSource is used to parse the ?near=<node> query parameter, used for
// sorting by RTT based on a source node. We set the source's DC to the target
// DC in the request, if given, or else the agent's DC.
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
s.parseDC(req, &source.Datacenter)
source.Ip = sourceAddrFromRequest(req)
if node := req.URL.Query().Get("near"); node != "" {
if node == "_agent" {
source.Node = s.agent.config.NodeName

View File

@ -327,6 +327,138 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("bad: %v", r)
}
})
t.Run("", func(t *testing.T) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
m := MockPreparedQuery{
executeFn: func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
expected := &structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: "my-id",
Limit: 5,
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "_ip",
Ip: "127.0.0.1",
},
Agent: structs.QuerySource{
Datacenter: a.Config.Datacenter,
Node: a.Config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
// Just set something so we can tell this is returned.
reply.Failovers = 99
return nil
},
}
if err := a.registerEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
req.Header.Add("X-Forwarded-For", "127.0.0.1")
resp := httptest.NewRecorder()
obj, err := a.srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueryExecuteResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if r.Failovers != 99 {
t.Fatalf("bad: %v", r)
}
})
t.Run("", func(t *testing.T) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
m := MockPreparedQuery{
executeFn: func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
expected := &structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: "my-id",
Limit: 5,
Source: structs.QuerySource{
Datacenter: "dc1",
Node: "_ip",
Ip: "198.18.0.1",
},
Agent: structs.QuerySource{
Datacenter: a.Config.Datacenter,
Node: a.Config.NodeName,
},
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
},
}
if !reflect.DeepEqual(args, expected) {
t.Fatalf("bad: %v", args)
}
// Just set something so we can tell this is returned.
reply.Failovers = 99
return nil
},
}
if err := a.registerEndpoint("PreparedQuery", &m); err != nil {
t.Fatalf("err: %v", err)
}
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
req.Header.Add("X-Forwarded-For", "198.18.0.1")
resp := httptest.NewRecorder()
obj, err := a.srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok := obj.(structs.PreparedQueryExecuteResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if r.Failovers != 99 {
t.Fatalf("bad: %v", r)
}
req, _ = http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
req.Header.Add("X-Forwarded-For", "198.18.0.1, 198.19.0.1")
resp = httptest.NewRecorder()
obj, err = a.srv.PreparedQuerySpecific(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("bad code: %d", resp.Code)
}
r, ok = obj.(structs.PreparedQueryExecuteResponse)
if !ok {
t.Fatalf("unexpected: %T", obj)
}
if r.Failovers != 99 {
t.Fatalf("bad: %v", r)
}
})
// Ensure the proper params are set when no special args are passed
t.Run("", func(t *testing.T) {

View File

@ -258,6 +258,7 @@ type QuerySource struct {
Datacenter string
Segment string
Node string
Ip string
}
// DCSpecificRequest is used to query about a specific DC

View File

@ -176,9 +176,15 @@ The table below shows this endpoint's support for
nearest instance to the specified node will be returned first, and subsequent
nodes in the response will be sorted in ascending order of estimated
round-trip times. If the node given does not exist, the nodes in the response
will be shuffled. Using `_agent` is supported, and will automatically return
results nearest the agent servicing the request. If unspecified, the response
will be shuffled by default.
will be shuffled. If unspecified, the response will be shuffled by default.
- `_agent` - Returns results nearest the agent servicing the request.
- `_ip` - Returns results nearest to the node associated with the source IP
where the query was executed from. For HTTP the source IP is the remote
peer's IP address or the value of the X-Forwarded-For header with the
header taking precedence. For DNS the source IP is the remote peer's IP
address or the value of the ENDS client IP with the EDNS client IP
taking precedence.
- `Service` `(Service: <required>)` - Specifies the structure to define the query's behavior.
@ -481,7 +487,9 @@ Token will be used.
- `near` `(string: "")` - Specifies to sort the resulting list in ascending
order based on the estimated round trip time from that node. Passing
`?near=_agent` will use the agent's node for the sort. If this is not present,
`?near=_agent` will use the agent's node for the sort. Passing `?near=_ip`
will use the source IP of the request or the value of the X-Forwarded-For
header to lookup the node to use for the sort. If this is not present,
the default behavior will shuffle the nodes randomly each time the query is
executed.