mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 23:05:28 +00:00
04edace1de
Fix issue with streaming service health watches. This commit fixes an issue where the health streams were unaware of service export changes. Whenever an exported-services config entry is modified, it is effectively an ACL change. The bug would be triggered by the following situation: - no services are exported - an upstream watch to service X is spawned - the streaming backend filters out data for service X (due to lack of exports) - service X is finally exported In the situation above, the streaming backend does not trigger a refresh of its data. This means that any events that were supposed to have been received prior to the export are NOT backfilled, and the watches never see service X spawning. We currently have decided to not trigger a stream refresh in this situation due to the potential for a thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially). Therefore, a local blocking-query approach was added by this commit for agentless. It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful, because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing. This means that while agents should technically have this same issue, they don't experience it with mesh health watches. Note that this is a temporary fix that solves the issue for proxycfg, but not service-discovery use cases.
184 lines
6.2 KiB
Go
184 lines
6.2 KiB
Go
package proxycfgglue
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/sdk/testutil"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestServerHealthBlocking(t *testing.T) {
|
|
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
|
|
var (
|
|
ctx = context.Background()
|
|
req = &structs.ServiceSpecificRequest{Datacenter: "dc2"}
|
|
correlationID = "correlation-id"
|
|
ch = make(chan<- proxycfg.UpdateEvent)
|
|
result = errors.New("KABOOM")
|
|
)
|
|
|
|
remoteSource := newMockHealth(t)
|
|
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
|
|
|
|
store := state.NewStateStore(nil)
|
|
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
|
|
err := dataSource.Notify(ctx, req, correlationID, ch)
|
|
require.Equal(t, result, err)
|
|
})
|
|
|
|
t.Run("services notify correctly", func(t *testing.T) {
|
|
const (
|
|
datacenter = "dc1"
|
|
serviceName = "web"
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
store := state.NewStateStore(nil)
|
|
aclResolver := newStaticResolver(acl.ManageAll())
|
|
dataSource := ServerHealthBlocking(ServerDataSourceDeps{
|
|
GetStore: func() Store { return store },
|
|
Datacenter: datacenter,
|
|
ACLResolver: aclResolver,
|
|
Logger: testutil.Logger(t),
|
|
}, nil, store)
|
|
dataSource.watchTimeout = 1 * time.Second
|
|
|
|
// Watch for all events
|
|
eventCh := make(chan proxycfg.UpdateEvent)
|
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: datacenter,
|
|
ServiceName: serviceName,
|
|
}, "", eventCh))
|
|
|
|
// Watch for a subset of events
|
|
filteredCh := make(chan proxycfg.UpdateEvent)
|
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: datacenter,
|
|
ServiceName: serviceName,
|
|
QueryOptions: structs.QueryOptions{
|
|
Filter: "Service.ID == \"web1\"",
|
|
},
|
|
}, "", filteredCh))
|
|
|
|
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
|
require.Empty(t, result.Nodes)
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
|
require.Empty(t, result.Nodes)
|
|
})
|
|
|
|
testutil.RunStep(t, "register services", func(t *testing.T) {
|
|
require.NoError(t, store.EnsureRegistration(10, &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: serviceName + "1",
|
|
Service: serviceName,
|
|
Port: 80,
|
|
},
|
|
}))
|
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
|
require.Len(t, result.Nodes, 1)
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
|
require.Len(t, result.Nodes, 1)
|
|
|
|
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: serviceName + "2",
|
|
Service: serviceName,
|
|
Port: 81,
|
|
},
|
|
}))
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
|
require.Len(t, result.Nodes, 2)
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
|
require.Len(t, result.Nodes, 1)
|
|
require.Equal(t, "web1", result.Nodes[0].Service.ID)
|
|
})
|
|
|
|
testutil.RunStep(t, "deregister service", func(t *testing.T) {
|
|
require.NoError(t, store.DeleteService(12, "foo", serviceName+"1", nil, ""))
|
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
|
require.Len(t, result.Nodes, 1)
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh)
|
|
require.Len(t, result.Nodes, 0)
|
|
})
|
|
|
|
testutil.RunStep(t, "acl enforcement", func(t *testing.T) {
|
|
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
Service: serviceName + "-sidecar-proxy",
|
|
Kind: structs.ServiceKindConnectProxy,
|
|
Proxy: structs.ConnectProxyConfig{
|
|
DestinationServiceName: serviceName,
|
|
},
|
|
},
|
|
}))
|
|
|
|
authzDeny := policyAuthorizer(t, ``)
|
|
authzAllow := policyAuthorizer(t, `
|
|
node_prefix "" { policy = "read" }
|
|
service_prefix "web" { policy = "read" }
|
|
`)
|
|
|
|
// Start a stream where insufficient permissions are denied
|
|
aclDenyCh := make(chan proxycfg.UpdateEvent)
|
|
aclResolver.SwapAuthorizer(authzDeny)
|
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Connect: true,
|
|
Datacenter: datacenter,
|
|
ServiceName: serviceName,
|
|
}, "", aclDenyCh))
|
|
require.ErrorContains(t, getEventError(t, aclDenyCh), "Permission denied")
|
|
|
|
// Adding ACL permissions will send valid data
|
|
aclResolver.SwapAuthorizer(authzAllow)
|
|
time.Sleep(dataSource.watchTimeout)
|
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, aclDenyCh)
|
|
require.Len(t, result.Nodes, 1)
|
|
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
|
|
|
|
// Start a stream where sufficient permissions are allowed
|
|
aclAllowCh := make(chan proxycfg.UpdateEvent)
|
|
aclResolver.SwapAuthorizer(authzAllow)
|
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Connect: true,
|
|
Datacenter: datacenter,
|
|
ServiceName: serviceName,
|
|
}, "", aclAllowCh))
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
|
|
require.Len(t, result.Nodes, 1)
|
|
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
|
|
|
|
// Removing ACL permissions will send empty data
|
|
aclResolver.SwapAuthorizer(authzDeny)
|
|
time.Sleep(dataSource.watchTimeout)
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
|
|
require.Len(t, result.Nodes, 0)
|
|
|
|
// Adding ACL permissions will send valid data
|
|
aclResolver.SwapAuthorizer(authzAllow)
|
|
time.Sleep(dataSource.watchTimeout)
|
|
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh)
|
|
require.Len(t, result.Nodes, 1)
|
|
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service)
|
|
})
|
|
})
|
|
}
|