Merge pull request #8806 from hashicorp/dnephin/service-health-interface

rpcclient: Add health.Client and use it in http and dns
This commit is contained in:
Daniel Nephin 2020-10-06 12:13:09 -04:00 committed by GitHub
commit a5c50c982d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 75 deletions

View File

@ -17,15 +17,16 @@ import (
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/agent/dns" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/net/http2"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/ae"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
@ -33,10 +34,13 @@ import (
"github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/dns"
"github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd" "github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds" "github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch" "github.com/hashicorp/consul/api/watch"
@ -46,10 +50,6 @@ import (
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/net/http2"
) )
const ( const (
@ -311,6 +311,10 @@ type Agent struct {
// they can update their internal state. // they can update their internal state.
configReloaders []ConfigReloader configReloaders []ConfigReloader
// TODO: pass directly to HTTPHandlers and DNSServer once those are passed
// into Agent, which will allow us to remove this field.
rpcClientHealth *health.Client
// enterpriseAgent embeds fields that we only access in consul-enterprise builds // enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent enterpriseAgent
} }
@ -355,6 +359,8 @@ func New(bd BaseDeps) (*Agent, error) {
cache: bd.Cache, cache: bd.Cache,
} }
a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a}
a.serviceManager = NewServiceManager(&a) a.serviceManager = NewServiceManager(&a)
// TODO: do this somewhere else, maybe move to newBaseDeps // TODO: do this somewhere else, maybe move to newBaseDeps

View File

@ -13,6 +13,9 @@ import (
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
radix "github.com/armon/go-radix" radix "github.com/armon/go-radix"
"github.com/coredns/coredns/plugin/pkg/dnsutil" "github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/hashicorp/go-hclog"
"github.com/miekg/dns"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
agentdns "github.com/hashicorp/consul/agent/dns" agentdns "github.com/hashicorp/consul/agent/dns"
@ -21,8 +24,6 @@ import (
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"github.com/miekg/dns"
) )
const ( const (
@ -1162,47 +1163,16 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st
Token: d.agent.tokens.UserToken(), Token: d.agent.tokens.UserToken(),
AllowStale: cfg.AllowStale, AllowStale: cfg.AllowStale,
MaxAge: cfg.CacheMaxAge, MaxAge: cfg.CacheMaxAge,
UseCache: cfg.UseCache,
MaxStaleDuration: cfg.MaxStale,
}, },
EnterpriseMeta: lookup.EnterpriseMeta, EnterpriseMeta: lookup.EnterpriseMeta,
} }
var out structs.IndexedCheckServiceNodes out, _, err := d.agent.rpcClientHealth.ServiceNodes(context.TODO(), args)
if cfg.UseCache {
raw, m, err := d.agent.cache.Get(context.TODO(), cachetype.HealthServicesName, &args)
if err != nil { if err != nil {
return out, err return out, err
} }
reply, ok := raw.(*structs.IndexedCheckServiceNodes)
if !ok {
// This should never happen, but we want to protect against panics
return out, fmt.Errorf("internal error: response type not correct")
}
d.logger.Trace("cache results for service",
"cache_hit", m.Hit,
"service", lookup.Service,
)
out = *reply
} else {
if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return out, err
}
}
if args.AllowStale && out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
// redo the request the response was too stale
if args.AllowStale && out.LastContact > cfg.MaxStale {
args.AllowStale = false
d.logger.Warn("Query results too stale, re-requesting")
if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return structs.IndexedCheckServiceNodes{}, err
}
}
// Filter out any service nodes due to health checks // Filter out any service nodes due to health checks
// We copy the slice to avoid modifying the result if it comes from the cache // We copy the slice to avoid modifying the result if it comes from the cache

View File

@ -7,7 +7,6 @@ import (
"strconv" "strconv"
"strings" "strings"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
) )
@ -220,35 +219,21 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil return nil, nil
} }
// Make the RPC request // TODO: handle this for all endpoints in parseConsistency
var out structs.IndexedCheckServiceNodes args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && args.QueryOptions.UseCache
defer setMeta(resp, &out.QueryMeta)
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.HealthServicesName, &args)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer setCacheMeta(resp, &m)
reply, ok := raw.(*structs.IndexedCheckServiceNodes) if args.QueryOptions.UseCache {
if !ok { setCacheMeta(resp, &md)
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
out = *reply
} else {
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
} }
setMeta(resp, &out.QueryMeta)
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
// FIXME: argument parsing should be done before performing the rpc
// Filter to only passing if specified // Filter to only passing if specified
filter, err := getBoolQueryParam(params, api.HealthPassing) filter, err := getBoolQueryParam(params, api.HealthPassing)
if err != nil { if err != nil {
@ -257,6 +242,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil return nil, nil
} }
// FIXME: remove filterNonPassing, replace with nodes.Filter, which is used by DNSServer
if filter { if filter {
out.Nodes = filterNonPassing(out.Nodes) out.Nodes = filterNonPassing(out.Nodes)
} }

View File

@ -0,0 +1,64 @@
package health
import (
"context"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
type Client struct {
NetRPC NetRPC
Cache CacheGetter
}
type NetRPC interface {
RPC(method string, args interface{}, reply interface{}) error
}
type CacheGetter interface {
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
}
func (c *Client) ServiceNodes(
ctx context.Context,
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
out, md, err := c.getServiceNodes(ctx, req)
if err != nil {
return out, md, err
}
// TODO: DNSServer emitted a metric here, do we still need it?
if req.QueryOptions.AllowStale && req.QueryOptions.MaxStaleDuration > 0 && out.QueryMeta.LastContact > req.MaxStaleDuration {
req.AllowStale = false
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
}
return out, md, err
}
func (c *Client) getServiceNodes(
ctx context.Context,
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes
if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
}
raw, md, err := c.Cache.Get(ctx, cachetype.HealthServicesName, &req)
if err != nil {
return out, md, err
}
value, ok := raw.(*structs.IndexedCheckServiceNodes)
if !ok {
panic("wrong response type for cachetype.HealthServicesName")
}
return *value, md, nil
}