mirror of https://github.com/status-im/consul.git
GH-3798: More PR Updates
Update docs a little Update/add tests. Make sure all the various ways of determining the source IP work Update X-Forwarded-For header parsing. This can be a comma separated list with the first element being the original IP so we now handle csv data there. Got rid of error return from sourceAddrFromRequest
This commit is contained in:
parent
136efeb3be
commit
0619efc254
|
@ -918,7 +918,7 @@ func (d *DNSServer) serviceLookup(network, datacenter, service, tag string, req,
|
|||
}
|
||||
|
||||
func ednsSubnetForRequest(req *dns.Msg) (*dns.EDNS0_SUBNET) {
|
||||
// Its probably not obvious but IsEdns0 returns the EDNS RR if present or nil otherwise
|
||||
// IsEdns0 returns the EDNS RR if present or nil otherwise
|
||||
edns := req.IsEdns0()
|
||||
|
||||
if edns == nil {
|
||||
|
|
|
@ -1837,6 +1837,132 @@ func TestDNS_ServiceLookup_TagPeriod(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDNS_PreparedQueryNearIPEDNS(t *testing.T) {
|
||||
ipCoord := lib.GenerateCoordinate(1 * time.Millisecond)
|
||||
serviceNodes := []struct{
|
||||
name string
|
||||
address string
|
||||
coord *coordinate.Coordinate
|
||||
}{
|
||||
{"foo1", "198.18.0.1", lib.GenerateCoordinate(1 * time.Millisecond),},
|
||||
{"foo2", "198.18.0.2", lib.GenerateCoordinate(10 * time.Millisecond),},
|
||||
{"foo3", "198.18.0.3", lib.GenerateCoordinate(30 * time.Millisecond),},
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t.Name(), "")
|
||||
defer a.Shutdown()
|
||||
|
||||
added := 0
|
||||
|
||||
// Register nodes with a service
|
||||
for _, cfg := range serviceNodes {
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: cfg.name,
|
||||
Address: cfg.address,
|
||||
Service: &structs.NodeService{
|
||||
Service: "db",
|
||||
Port: 12345,
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
err := a.RPC("Catalog.Register", args, &out)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Send coordinate updates
|
||||
coordArgs := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: cfg.name,
|
||||
Coord: cfg.coord,
|
||||
}
|
||||
err = a.RPC("Coordinate.Update", &coordArgs, &out)
|
||||
require.NoError(t, err)
|
||||
|
||||
added += 1
|
||||
}
|
||||
|
||||
fmt.Printf("Added %d service nodes\n", added)
|
||||
|
||||
// Register a node without a service
|
||||
{
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "198.18.0.9",
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
err := a.RPC("Catalog.Register", args, &out)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Send coordinate updates for a few nodes.
|
||||
coordArgs := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Coord: ipCoord,
|
||||
}
|
||||
err = a.RPC("Coordinate.Update", &coordArgs, &out)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Register a prepared query Near = _ip
|
||||
{
|
||||
args := &structs.PreparedQueryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.PreparedQueryCreate,
|
||||
Query: &structs.PreparedQuery{
|
||||
Name: "some.query.we.like",
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "db",
|
||||
Near: "_ip",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var id string
|
||||
err := a.RPC("PreparedQuery.Apply", args, &id)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion("some.query.we.like.query.consul.", dns.TypeA)
|
||||
m.SetEdns0(4096, false)
|
||||
o := new(dns.OPT)
|
||||
o.Hdr.Name = "."
|
||||
o.Hdr.Rrtype = dns.TypeOPT
|
||||
e := new(dns.EDNS0_SUBNET)
|
||||
e.Code = dns.EDNS0SUBNET
|
||||
e.Family = 1
|
||||
e.SourceNetmask = 32
|
||||
e.SourceScope = 0
|
||||
e.Address = net.ParseIP("198.18.0.9").To4()
|
||||
o.Option = append(o.Option, e)
|
||||
m.Extra = append(m.Extra, o)
|
||||
|
||||
c := new(dns.Client)
|
||||
in, _, err := c.Exchange(m, a.DNSAddr())
|
||||
if err != nil {
|
||||
r.Fatalf("Error with call to dns.Client.Exchange: %s", err)
|
||||
}
|
||||
|
||||
if len(serviceNodes) != len(in.Answer) {
|
||||
r.Fatalf("Expecting %d A RRs in response, Actual found was %d", len(serviceNodes), len(in.Answer))
|
||||
}
|
||||
|
||||
for i, rr := range in.Answer {
|
||||
if aRec, ok := rr.(*dns.A); ok {
|
||||
if actual := aRec.A.String(); serviceNodes[i].address != actual {
|
||||
r.Fatalf("Expecting A RR #%d = %s, Actual RR was %s", i, serviceNodes[i].address, actual)
|
||||
}
|
||||
} else {
|
||||
r.Fatalf("DNS Answer contained a non-A RR")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestDNS_PreparedQueryNearIP(t *testing.T) {
|
||||
ipCoord := lib.GenerateCoordinate(1 * time.Millisecond)
|
||||
serviceNodes := []struct{
|
||||
|
@ -1890,7 +2016,7 @@ func TestDNS_PreparedQueryNearIP(t *testing.T) {
|
|||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.1",
|
||||
Address: "198.18.0.9",
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
|
@ -1925,42 +2051,6 @@ func TestDNS_PreparedQueryNearIP(t *testing.T) {
|
|||
err := a.RPC("PreparedQuery.Apply", args, &id)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion("some.query.we.like.query.consul.", dns.TypeA)
|
||||
m.SetEdns0(4096, false)
|
||||
o := new(dns.OPT)
|
||||
o.Hdr.Name = "."
|
||||
o.Hdr.Rrtype = dns.TypeOPT
|
||||
e := new(dns.EDNS0_SUBNET)
|
||||
e.Code = dns.EDNS0SUBNET
|
||||
e.Family = 1
|
||||
e.SourceNetmask = 32
|
||||
e.SourceScope = 0
|
||||
e.Address = net.ParseIP("127.0.0.1").To4()
|
||||
o.Option = append(o.Option, e)
|
||||
m.Extra = append(m.Extra, o)
|
||||
|
||||
c := new(dns.Client)
|
||||
in, _, err := c.Exchange(m, a.DNSAddr())
|
||||
if err != nil {
|
||||
r.Fatalf("Error with call to dns.Client.Exchange: %s", err)
|
||||
}
|
||||
|
||||
if len(serviceNodes) != len(in.Answer) {
|
||||
r.Fatalf("Expecting %d A RRs in response, Actual found was %d", len(serviceNodes), len(in.Answer))
|
||||
}
|
||||
|
||||
for i, rr := range in.Answer {
|
||||
if aRec, ok := rr.(*dns.A); ok {
|
||||
if actual := aRec.A.String(); serviceNodes[i].address != actual {
|
||||
r.Fatalf("Expecting A RR #%d = %s, Actual RR was %s", i, serviceNodes[i].address, actual)
|
||||
}
|
||||
} else {
|
||||
r.Fatalf("DNS Answer contained a non-A RR")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
m := new(dns.Msg)
|
||||
|
|
|
@ -498,23 +498,26 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
|
|||
*token = s.agent.tokens.UserToken()
|
||||
}
|
||||
|
||||
func sourceAddrFromRequest(req *http.Request) (string, error) {
|
||||
forwardHost := req.Header.Get("X-Forwarded-For")
|
||||
forwardIp := net.ParseIP(forwardHost)
|
||||
if forwardIp != nil {
|
||||
return forwardIp.String(), nil
|
||||
}
|
||||
func sourceAddrFromRequest(req *http.Request) string {
|
||||
xff := req.Header.Get("X-Forwarded-For")
|
||||
forwardHosts := strings.Split(xff, ",")
|
||||
if len(forwardHosts) > 0 {
|
||||
forwardIp := net.ParseIP(strings.TrimSpace(forwardHosts[0]))
|
||||
if forwardIp != nil {
|
||||
return forwardIp.String()
|
||||
}
|
||||
}
|
||||
|
||||
host, _, err := net.SplitHostPort(req.RemoteAddr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return ""
|
||||
}
|
||||
|
||||
ip := net.ParseIP(host)
|
||||
if ip != nil {
|
||||
return ip.String(), nil
|
||||
return ip.String()
|
||||
} else {
|
||||
return "", fmt.Errorf("Could not get remote IP from HTTP Request")
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -524,7 +527,7 @@ func sourceAddrFromRequest(req *http.Request) (string, error) {
|
|||
// DC in the request, if given, or else the agent's DC.
|
||||
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
||||
s.parseDC(req, &source.Datacenter)
|
||||
source.Ip, _ = sourceAddrFromRequest(req)
|
||||
source.Ip = sourceAddrFromRequest(req)
|
||||
if node := req.URL.Query().Get("near"); node != "" {
|
||||
if node == "_agent" {
|
||||
source.Node = s.agent.config.NodeName
|
||||
|
|
|
@ -380,9 +380,66 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
if r.Failovers != 99 {
|
||||
t.Fatalf("bad: %v", r)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("", func(t *testing.T) {
|
||||
a := NewTestAgent(t.Name(), "")
|
||||
defer a.Shutdown()
|
||||
|
||||
m := MockPreparedQuery{
|
||||
executeFn: func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||
expected := &structs.PreparedQueryExecuteRequest{
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: "my-id",
|
||||
Limit: 5,
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "_ip",
|
||||
Ip: "198.18.0.1",
|
||||
},
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: a.Config.Datacenter,
|
||||
Node: a.Config.NodeName,
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "my-token",
|
||||
RequireConsistent: true,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(args, expected) {
|
||||
t.Fatalf("bad: %v", args)
|
||||
}
|
||||
|
||||
// Just set something so we can tell this is returned.
|
||||
reply.Failovers = 99
|
||||
return nil
|
||||
},
|
||||
}
|
||||
if err := a.registerEndpoint("PreparedQuery", &m); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, _ := http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
|
||||
req.Header.Add("X-Forwarded-For", "198.18.0.1")
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.PreparedQuerySpecific(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
r, ok := obj.(structs.PreparedQueryExecuteResponse)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected: %T", obj)
|
||||
}
|
||||
if r.Failovers != 99 {
|
||||
t.Fatalf("bad: %v", r)
|
||||
}
|
||||
|
||||
req, _ = http.NewRequest("GET", "/v1/query/my-id/execute?token=my-token&consistent=true&near=_ip&limit=5", body)
|
||||
req.RemoteAddr = "127.0.0.1:12345"
|
||||
req.Header.Add("X-Forwarded-For", "198.18.0.1, 198.19.0.1")
|
||||
resp = httptest.NewRecorder()
|
||||
obj, err = a.srv.PreparedQuerySpecific(resp, req)
|
||||
if err != nil {
|
||||
|
|
|
@ -176,14 +176,15 @@ The table below shows this endpoint's support for
|
|||
nearest instance to the specified node will be returned first, and subsequent
|
||||
nodes in the response will be sorted in ascending order of estimated
|
||||
round-trip times. If the node given does not exist, the nodes in the response
|
||||
will be shuffled. Using `_agent` is supported, and will automatically return
|
||||
results nearest the agent servicing the request. Using `_ip` is supported and
|
||||
will automatically return results nearest to the node associated with the
|
||||
source IP where the query is executed from. For HTTP the source IP is the
|
||||
remote peer's IP address or the value of the X-Forwarded-For header with the
|
||||
header taking precedence. For DNS the source IP is the value of the EDNS
|
||||
client IP or the remote peer's IP address. If unspecified, the response
|
||||
will be shuffled by default.
|
||||
will be shuffled. If unspecified, the response will be shuffled by default.
|
||||
|
||||
- `_agent` - Returns results nearest the agent servicing the request.
|
||||
- `_ip` - Returns results nearest to the node associated with the source IP
|
||||
where the query was executed from. For HTTP the source IP is the remote
|
||||
peer's IP address or the value of the X-Forwarded-For header with the
|
||||
header taking precedence. For DNS the source IP is the remote peer's IP
|
||||
address or the value of the ENDS client IP with the EDNS client IP
|
||||
taking precedence.
|
||||
|
||||
- `Service` `(Service: <required>)` - Specifies the structure to define the query's behavior.
|
||||
|
||||
|
|
Loading…
Reference in New Issue