mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
d7f8a8e4ef
OSS portion of enterprise PR 1857. This removes (most) references to the `cache.UpdateEvent` type in the `proxycfg` package. As we're going to be direct usage of the agent cache with interfaces that can be satisfied by alternative server-local datasources, it doesn't make sense to depend on this type everywhere anymore (particularly on the `state.ch` channel). We also plan to extract `proxycfg` out of Consul into a shared library in the future, which would require removing this dependency. Aside from a fairly rote find-and-replace, the main change is that the `cache.Cache` and `health.Client` types now accept a callback function parameter, rather than a `chan<- cache.UpdateEvents`. This allows us to do the type conversion without running another goroutine.
149 lines
4.1 KiB
Go
149 lines
4.1 KiB
Go
package health
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/submatview"
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
|
)
|
|
|
|
// Client provides access to service health data.
|
|
type Client struct {
|
|
NetRPC NetRPC
|
|
Cache CacheGetter
|
|
ViewStore MaterializedViewStore
|
|
MaterializerDeps MaterializerDeps
|
|
CacheName string
|
|
UseStreamingBackend bool
|
|
QueryOptionDefaults func(options *structs.QueryOptions)
|
|
}
|
|
|
|
type NetRPC interface {
|
|
RPC(method string, args interface{}, reply interface{}) error
|
|
}
|
|
|
|
type CacheGetter interface {
|
|
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
|
|
NotifyCallback(ctx context.Context, t string, r cache.Request, cID string, cb cache.Callback) error
|
|
}
|
|
|
|
type MaterializedViewStore interface {
|
|
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
|
|
NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error
|
|
}
|
|
|
|
func (c *Client) ServiceNodes(
|
|
ctx context.Context,
|
|
req structs.ServiceSpecificRequest,
|
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
|
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
|
|
c.QueryOptionDefaults(&req.QueryOptions)
|
|
|
|
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
|
|
if err != nil {
|
|
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
|
|
}
|
|
meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached}
|
|
return *result.Value.(*structs.IndexedCheckServiceNodes), meta, err
|
|
}
|
|
|
|
out, md, err := c.getServiceNodes(ctx, req)
|
|
if err != nil {
|
|
return out, md, err
|
|
}
|
|
|
|
// TODO: DNSServer emitted a metric here, do we still need it?
|
|
if req.QueryOptions.AllowStale && req.QueryOptions.MaxStaleDuration > 0 && out.QueryMeta.LastContact > req.MaxStaleDuration {
|
|
req.AllowStale = false
|
|
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
|
return out, cache.ResultMeta{}, err
|
|
}
|
|
|
|
return out, md, err
|
|
}
|
|
|
|
func (c *Client) getServiceNodes(
|
|
ctx context.Context,
|
|
req structs.ServiceSpecificRequest,
|
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
|
var out structs.IndexedCheckServiceNodes
|
|
if !req.QueryOptions.UseCache {
|
|
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
|
return out, cache.ResultMeta{}, err
|
|
}
|
|
|
|
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
|
|
if err != nil {
|
|
return out, md, err
|
|
}
|
|
|
|
value, ok := raw.(*structs.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
panic("wrong response type for cachetype.HealthServicesName")
|
|
}
|
|
|
|
return *value, md, nil
|
|
}
|
|
|
|
func (c *Client) Notify(
|
|
ctx context.Context,
|
|
req structs.ServiceSpecificRequest,
|
|
correlationID string,
|
|
cb cache.Callback,
|
|
) error {
|
|
if c.useStreaming(req) {
|
|
sr := c.newServiceRequest(req)
|
|
return c.ViewStore.NotifyCallback(ctx, sr, correlationID, cb)
|
|
}
|
|
|
|
return c.Cache.NotifyCallback(ctx, c.CacheName, &req, correlationID, cb)
|
|
}
|
|
|
|
func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
|
|
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == ""
|
|
}
|
|
|
|
func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
|
|
return serviceRequest{
|
|
ServiceSpecificRequest: req,
|
|
deps: c.MaterializerDeps,
|
|
}
|
|
}
|
|
|
|
// Close any underlying connections used by the client.
|
|
func (c *Client) Close() error {
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
return c.MaterializerDeps.Conn.Close()
|
|
}
|
|
|
|
type serviceRequest struct {
|
|
structs.ServiceSpecificRequest
|
|
deps MaterializerDeps
|
|
}
|
|
|
|
func (r serviceRequest) CacheInfo() cache.RequestInfo {
|
|
return r.ServiceSpecificRequest.CacheInfo()
|
|
}
|
|
|
|
func (r serviceRequest) Type() string {
|
|
return "agent.rpcclient.health.serviceRequest"
|
|
}
|
|
|
|
func (r serviceRequest) NewMaterializer() (submatview.Materializer, error) {
|
|
view, err := newHealthView(r.ServiceSpecificRequest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
deps := submatview.Deps{
|
|
View: view,
|
|
Logger: r.deps.Logger,
|
|
Request: newMaterializerRequest(r.ServiceSpecificRequest),
|
|
}
|
|
|
|
return submatview.NewRPCMaterializer(pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn), deps), nil
|
|
}
|