mirror of https://github.com/status-im/consul.git
Fix issue with streaming service health watches. (#17775)
Fix issue with streaming service health watches. This commit fixes an issue where the health streams were unaware of service export changes. Whenever an exported-services config entry is modified, it is effectively an ACL change. The bug would be triggered by the following situation: - no services are exported - an upstream watch to service X is spawned - the streaming backend filters out data for service X (due to lack of exports) - service X is finally exported In the situation above, the streaming backend does not trigger a refresh of its data. This means that any events that were supposed to have been received prior to the export are NOT backfilled, and the watches never see service X spawning. We currently have decided to not trigger a stream refresh in this situation due to the potential for a thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially). Therefore, a local blocking-query approach was added by this commit for agentless. It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful, because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing. This means that while agents should technically have this same issue, they don't experience it with mesh health watches. Note that this is a temporary fix that solves the issue for proxycfg, but not service-discovery use cases.
This commit is contained in:
parent
ad0a277e09
commit
04edace1de
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
connect: Fix issue where changes to service exports were not reflected in proxies.
|
||||
```
|
|
@ -4555,7 +4555,11 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
|||
sources.ExportedPeeredServices = proxycfgglue.ServerExportedPeeredServices(deps)
|
||||
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
|
||||
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
|
||||
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||
// We do not use this health check currently due to a bug with the way that service exports
|
||||
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
|
||||
// for more details.
|
||||
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
|
||||
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
|
||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||
|
|
|
@ -16,8 +16,9 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrorNotFound = errors.New("no data found for query")
|
||||
ErrorNotChanged = errors.New("data did not change for query")
|
||||
ErrorNotFound = errors.New("no data found for query")
|
||||
ErrorNotChanged = errors.New("data did not change for query")
|
||||
ErrorACLResetData = errors.New("an acl update forced a state reset")
|
||||
|
||||
errNilContext = errors.New("cannot call ServerLocalNotify with a nil context")
|
||||
errNilGetStore = errors.New("cannot call ServerLocalNotify without a callback to get a StateStore")
|
||||
|
@ -320,8 +321,15 @@ func serverLocalNotifyRoutine[ResultType any, StoreType StateStore](
|
|||
return
|
||||
}
|
||||
|
||||
// An ACL reset error can be raised so that the index greater-than check is
|
||||
// bypassed. We should not propagate it to the caller.
|
||||
forceReset := errors.Is(err, ErrorACLResetData)
|
||||
if forceReset {
|
||||
err = nil
|
||||
}
|
||||
|
||||
// Check the index to see if we should call notify
|
||||
if minIndex == 0 || minIndex < index {
|
||||
if minIndex == 0 || minIndex < index || forceReset {
|
||||
notify(ctx, correlationID, result, err)
|
||||
minIndex = index
|
||||
}
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package proxycfgglue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/watch"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/structs/aclfilter"
|
||||
)
|
||||
|
||||
// ServerHealthBlocking exists due to a bug with the streaming backend and its interaction with ACLs.
|
||||
// Whenever an exported-services config entry is modified, this is effectively an ACL change.
|
||||
// Assume the following situation:
|
||||
// - no services are exported
|
||||
// - an upstream watch to service X is spawned
|
||||
// - the streaming backend filters out data for service X (because it's not exported yet)
|
||||
// - service X is finally exported
|
||||
//
|
||||
// In this situation, the streaming backend does not trigger a refresh of its data.
|
||||
// This means that any events that were supposed to have been received prior to the export are NOT backfilled,
|
||||
// and the watches never see service X spawning.
|
||||
//
|
||||
// We currently have decided to not trigger a stream refresh in this situation due to the potential for a
|
||||
// thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially).
|
||||
// Therefore, this local blocking-query approach exists for agentless.
|
||||
//
|
||||
// It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful,
|
||||
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
|
||||
// This means that while agents should technically have this same issue, they don't experience it with mesh health
|
||||
// watches.
|
||||
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
|
||||
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
|
||||
}
|
||||
|
||||
type serverHealthBlocking struct {
|
||||
deps ServerDataSourceDeps
|
||||
remoteSource proxycfg.Health
|
||||
state *state.Store
|
||||
watchTimeout time.Duration
|
||||
}
|
||||
|
||||
// Notify is mostly a copy of the function in `agent/consul/health_endpoint.go` with a few minor tweaks.
|
||||
// Most notably, some query features unnecessary for mesh have been stripped out.
|
||||
func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||
if args.Datacenter != h.deps.Datacenter {
|
||||
return h.remoteSource.Notify(ctx, args, correlationID, ch)
|
||||
}
|
||||
|
||||
// Verify the arguments
|
||||
if args.ServiceName == "" {
|
||||
return fmt.Errorf("Must provide service name")
|
||||
}
|
||||
if args.EnterpriseMeta.PartitionOrDefault() == acl.WildcardName {
|
||||
return fmt.Errorf("Wildcards are not allowed in the partition field")
|
||||
}
|
||||
|
||||
// Determine the function we'll call
|
||||
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
|
||||
switch {
|
||||
case args.Connect:
|
||||
f = serviceNodesConnect
|
||||
case args.Ingress:
|
||||
f = serviceNodesIngress
|
||||
default:
|
||||
f = serviceNodesDefault
|
||||
}
|
||||
|
||||
filter, err := bexpr.CreateFilter(args.Filter, nil, structs.CheckServiceNode{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var hadResults bool = false
|
||||
return watch.ServerLocalNotify(ctx, correlationID, h.deps.GetStore,
|
||||
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedCheckServiceNodes, error) {
|
||||
// This is necessary so that service export changes are eventually picked up, since
|
||||
// they won't trigger the watch themselves.
|
||||
timeoutCh := make(chan struct{})
|
||||
time.AfterFunc(h.watchTimeout, func() {
|
||||
close(timeoutCh)
|
||||
})
|
||||
ws.Add(timeoutCh)
|
||||
|
||||
authzContext := acl.AuthorizerContext{
|
||||
Peer: args.PeerName,
|
||||
}
|
||||
authz, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
// If we're doing a connect or ingress query, we need read access to the service
|
||||
// we're trying to find proxies for, so check that.
|
||||
if args.Connect || args.Ingress {
|
||||
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
|
||||
// If access was somehow revoked (via token deletion or unexporting), then we clear the
|
||||
// last-known results before triggering an error. This way, the proxies will actually update
|
||||
// their data, rather than holding onto the last-known list of healthy nodes indefinitely.
|
||||
if hadResults {
|
||||
hadResults = false
|
||||
return 0, &structs.IndexedCheckServiceNodes{}, watch.ErrorACLResetData
|
||||
}
|
||||
return 0, nil, acl.ErrPermissionDenied
|
||||
}
|
||||
}
|
||||
|
||||
var thisReply structs.IndexedCheckServiceNodes
|
||||
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
raw, err := filter.Execute(thisReply.Nodes)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
thisReply.Nodes = raw.(structs.CheckServiceNodes)
|
||||
|
||||
// Note: we filter the results with ACLs *after* applying the user-supplied
|
||||
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
|
||||
// results that would be filtered out even if the user did have permission.
|
||||
if err := h.filterACL(&authzContext, args.Token, &thisReply); err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
hadResults = true
|
||||
return thisReply.Index, &thisReply, nil
|
||||
},
|
||||
dispatchBlockingQueryUpdate[*structs.IndexedCheckServiceNodes](ch),
|
||||
)
|
||||
}
|
||||
|
||||
func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token string, subj *structs.IndexedCheckServiceNodes) error {
|
||||
// Get the ACL from the token
|
||||
var entMeta acl.EnterpriseMeta
|
||||
authorizer, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(token, &entMeta, authz)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aclfilter.New(authorizer, h.deps.Logger).Filter(subj)
|
||||
return nil
|
||||
}
|
||||
|
||||
func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
||||
}
|
||||
|
||||
func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
|
||||
}
|
||||
|
||||
func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
||||
}
|
|
@ -0,0 +1,183 @@
|
|||
package proxycfgglue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestServerHealthBlocking(t *testing.T) {
|
||||
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
req = &structs.ServiceSpecificRequest{Datacenter: "dc2"}
|
||||
correlationID = "correlation-id"
|
||||
ch = make(chan<- proxycfg.UpdateEvent)
|
||||
result = errors.New("KABOOM")
|
||||
)
|
||||
|
||||
remoteSource := newMockHealth(t)
|
||||
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
|
||||
|
||||
store := state.NewStateStore(nil)
|
||||
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
|
||||
err := dataSource.Notify(ctx, req, correlationID, ch)
|
||||
require.Equal(t, result, err)
|
||||
})
|
||||
|
||||
t.Run("services notify correctly", func(t *testing.T) {
|
||||
const (
|
||||
datacenter = "dc1"
|
||||
serviceName = "web"
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
store := state.NewStateStore(nil)
|
||||
aclResolver := newStaticResolver(acl.ManageAll())
|
||||
dataSource := ServerHealthBlocking(ServerDataSourceDeps{
|
||||
GetStore: func() Store { return store },
|
||||
Datacenter: datacenter,
|
||||
ACLResolver: aclResolver,
|
||||
Logger: testutil.Logger(t),
|
||||
}, nil, store)
|
||||
dataSource.watchTimeout = 1 * time.Second
|
||||
|
||||
// Watch for all events
|
||||
eventCh := make(chan proxycfg.UpdateEvent)
|
||||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
Datacenter: datacenter,
|
||||
ServiceName: serviceName,
|
||||
}, "", eventCh))
|
||||
|
||||
// Watch for a subset of events
|
||||
filteredCh := make(chan proxycfg.UpdateEvent)
|
||||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
Datacenter: datacenter,
|
||||
ServiceName: serviceName,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Filter: "Service.ID == \"web1\"",
|
||||
},
|
||||
}, "", filteredCh))
|
||||
|
||||
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||
require.Empty(t, result.Nodes)
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
||||
require.Empty(t, result.Nodes)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "register services", func(t *testing.T) {
|
||||
require.NoError(t, store.EnsureRegistration(10, &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: serviceName + "1",
|
||||
Service: serviceName,
|
||||
Port: 80,
|
||||
},
|
||||
}))
|
||||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
|
||||
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: serviceName + "2",
|
||||
Service: serviceName,
|
||||
Port: 81,
|
||||
},
|
||||
}))
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||
require.Len(t, result.Nodes, 2)
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
require.Equal(t, "web1", result.Nodes[0].Service.ID)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "deregister service", func(t *testing.T) {
|
||||
require.NoError(t, store.DeleteService(12, "foo", serviceName+"1", nil, ""))
|
||||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
||||
require.Len(t, result.Nodes, 0)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "acl enforcement", func(t *testing.T) {
|
||||
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
Service: serviceName + "-sidecar-proxy",
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: serviceName,
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
authzDeny := policyAuthorizer(t, ``)
|
||||
authzAllow := policyAuthorizer(t, `
|
||||
node_prefix "" { policy = "read" }
|
||||
service_prefix "web" { policy = "read" }
|
||||
`)
|
||||
|
||||
// Start a stream where insufficient permissions are denied
|
||||
aclDenyCh := make(chan proxycfg.UpdateEvent)
|
||||
aclResolver.SwapAuthorizer(authzDeny)
|
||||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
Connect: true,
|
||||
Datacenter: datacenter,
|
||||
ServiceName: serviceName,
|
||||
}, "", aclDenyCh))
|
||||
require.ErrorContains(t, getEventError(t, aclDenyCh), "Permission denied")
|
||||
|
||||
// Adding ACL permissions will send valid data
|
||||
aclResolver.SwapAuthorizer(authzAllow)
|
||||
time.Sleep(dataSource.watchTimeout)
|
||||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, aclDenyCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
|
||||
|
||||
// Start a stream where sufficient permissions are allowed
|
||||
aclAllowCh := make(chan proxycfg.UpdateEvent)
|
||||
aclResolver.SwapAuthorizer(authzAllow)
|
||||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
Connect: true,
|
||||
Datacenter: datacenter,
|
||||
ServiceName: serviceName,
|
||||
}, "", aclAllowCh))
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
|
||||
|
||||
// Removing ACL permissions will send empty data
|
||||
aclResolver.SwapAuthorizer(authzDeny)
|
||||
time.Sleep(dataSource.watchTimeout)
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
|
||||
require.Len(t, result.Nodes, 0)
|
||||
|
||||
// Adding ACL permissions will send valid data
|
||||
aclResolver.SwapAuthorizer(authzAllow)
|
||||
time.Sleep(dataSource.watchTimeout)
|
||||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
|
||||
require.Len(t, result.Nodes, 1)
|
||||
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
|
||||
})
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue