proxycfg-glue: server-local compiled discovery chain data source

This is the OSS portion of enterprise PR 2236.

Adds a local blocking query-based implementation of the proxycfg.CompiledDiscoveryChain interface.
This commit is contained in:
Daniel Upton 2022-07-12 11:34:14 +01:00 committed by Dan Upton
parent 6d047c453a
commit fbf88d3b19
9 changed files with 345 additions and 121 deletions

View File

@ -4237,6 +4237,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
if server, ok := a.delegate.(*consul.Server); ok {
deps := proxycfgglue.ServerDataSourceDeps{
Datacenter: a.config.Datacenter,
EventPublisher: a.baseDeps.EventPublisher,
ViewStore: a.baseDeps.ViewStore,
Logger: a.logger.Named("proxycfg.server-data-sources"),
@ -4245,6 +4246,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
}
sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps)
sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps)
sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache))
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
}

View File

@ -19,6 +19,7 @@ import (
// ServerDataSourceDeps contains the dependencies needed for sourcing data from
// server-local sources (e.g. materialized views).
type ServerDataSourceDeps struct {
Datacenter string
ViewStore *submatview.Store
EventPublisher *stream.EventPublisher
Logger hclog.Logger

View File

@ -0,0 +1,95 @@
package proxycfgglue
import (
"context"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
// CacheCompiledDiscoveryChain satisfies the proxycfg.CompiledDiscoveryChain
// interface by sourcing data from the agent cache.
func CacheCompiledDiscoveryChain(c *cache.Cache) proxycfg.CompiledDiscoveryChain {
return &cacheProxyDataSource[*structs.DiscoveryChainRequest]{c, cachetype.CompiledDiscoveryChainName}
}
// ServerCompiledDiscoveryChain satisfies the proxycfg.CompiledDiscoveryChain
// interface by sourcing data from a blocking query against the server's state
// store.
//
// Requests for services in remote datacenters will be delegated to the given
// remoteSource (i.e. CacheCompiledDiscoveryChain).
func ServerCompiledDiscoveryChain(deps ServerDataSourceDeps, remoteSource proxycfg.CompiledDiscoveryChain) proxycfg.CompiledDiscoveryChain {
return &serverCompiledDiscoveryChain{deps, remoteSource}
}
type serverCompiledDiscoveryChain struct {
deps ServerDataSourceDeps
remoteSource proxycfg.CompiledDiscoveryChain
}
func (s serverCompiledDiscoveryChain) Notify(ctx context.Context, req *structs.DiscoveryChainRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
if req.Datacenter != s.deps.Datacenter {
return s.remoteSource.Notify(ctx, req, correlationID, ch)
}
entMeta := req.GetEnterpriseMeta()
evalDC := req.EvaluateInDatacenter
if evalDC == "" {
evalDC = s.deps.Datacenter
}
compileReq := discoverychain.CompileRequest{
ServiceName: req.Name,
EvaluateInNamespace: entMeta.NamespaceOrDefault(),
EvaluateInPartition: entMeta.PartitionOrDefault(),
EvaluateInDatacenter: evalDC,
OverrideMeshGateway: req.OverrideMeshGateway,
OverrideProtocol: req.OverrideProtocol,
OverrideConnectTimeout: req.OverrideConnectTimeout,
}
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.DiscoveryChainResponse, error) {
var authzContext acl.AuthorizerContext
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, req.GetEnterpriseMeta(), &authzContext)
if err != nil {
return 0, nil, err
}
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(req.Name, &authzContext); err != nil {
// TODO(agentless): the agent cache handles acl.IsErrNotFound specially to
// prevent endlessly retrying if an ACL token is deleted. We should probably
// do this in watch.ServerLocalNotify too.
return 0, nil, err
}
index, chain, entries, err := store.ServiceDiscoveryChain(ws, req.Name, entMeta, compileReq)
if err != nil {
return 0, nil, err
}
rsp := &structs.DiscoveryChainResponse{
Chain: chain,
QueryMeta: structs.QueryMeta{
Backend: structs.QueryBackendBlocking,
Index: index,
},
}
// TODO(boxofrad): Check with @mkeeler that this is the correct thing to do.
if entries.IsEmpty() {
return index, rsp, watch.ErrorNotFound
}
return index, rsp, nil
},
dispatchBlockingQueryUpdate[*structs.DiscoveryChainResponse](ch),
)
}

View File

@ -0,0 +1,114 @@
package proxycfgglue
import (
"context"
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
func TestServerCompiledDiscoveryChain(t *testing.T) {
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
var (
ctx = context.Background()
req = &structs.DiscoveryChainRequest{Datacenter: "dc2"}
correlationID = "correlation-id"
ch = make(chan<- proxycfg.UpdateEvent)
result = errors.New("KABOOM")
)
remoteSource := newMockCompiledDiscoveryChain(t)
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
dataSource := ServerCompiledDiscoveryChain(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
err := dataSource.Notify(ctx, req, correlationID, ch)
require.Equal(t, result, err)
})
t.Run("local queries are served from the state store", func(t *testing.T) {
const (
serviceName = "web"
datacenter = "dc1"
index = 123
)
store := state.NewStateStore(nil)
require.NoError(t, store.CASetConfig(index, &structs.CAConfiguration{ClusterID: "cluster-id"}))
require.NoError(t, store.EnsureConfigEntry(index, &structs.ServiceConfigEntry{
Name: serviceName,
Kind: structs.ServiceDefaults,
}))
req := &structs.DiscoveryChainRequest{
Name: serviceName,
Datacenter: datacenter,
}
resolver := newStaticResolver(
policyAuthorizer(t, fmt.Sprintf(`service "%s" { policy = "read" }`, serviceName)),
)
dataSource := ServerCompiledDiscoveryChain(ServerDataSourceDeps{
ACLResolver: resolver,
Datacenter: datacenter,
GetStore: func() Store { return store },
}, nil)
eventCh := make(chan proxycfg.UpdateEvent)
err := dataSource.Notify(context.Background(), req, "", eventCh)
require.NoError(t, err)
// Check we get an event with the initial state.
result := getEventResult[*structs.DiscoveryChainResponse](t, eventCh)
require.NotNil(t, result.Chain)
// Change the protocol to HTTP and check we get a recompiled chain.
require.NoError(t, store.EnsureConfigEntry(index+1, &structs.ServiceConfigEntry{
Name: serviceName,
Kind: structs.ServiceDefaults,
Protocol: "http",
}))
result = getEventResult[*structs.DiscoveryChainResponse](t, eventCh)
require.NotNil(t, result.Chain)
require.Equal(t, "http", result.Chain.Protocol)
// Revoke access to the service.
resolver.SwapAuthorizer(acl.DenyAll())
// Write another config entry.
require.NoError(t, store.EnsureConfigEntry(index+2, &structs.ServiceConfigEntry{
Name: serviceName,
Kind: structs.ServiceDefaults,
MaxInboundConnections: 1,
}))
// Should no longer receive events for this service.
expectNoEvent(t, eventCh)
})
}
func newMockCompiledDiscoveryChain(t *testing.T) *mockCompiledDiscoveryChain {
mock := &mockCompiledDiscoveryChain{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
type mockCompiledDiscoveryChain struct {
mock.Mock
}
func (m *mockCompiledDiscoveryChain) Notify(ctx context.Context, req *structs.DiscoveryChainRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
return m.Called(ctx, req, correlationID, ch).Error(0)
}

View File

@ -8,6 +8,8 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/rpcclient/health"
@ -20,6 +22,7 @@ type Store interface {
watch.StateStore
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
}
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
@ -28,12 +31,6 @@ func CacheCARoots(c *cache.Cache) proxycfg.CARoots {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ConnectCARootName}
}
// CacheCompiledDiscoveryChain satisfies the proxycfg.CompiledDiscoveryChain
// interface by sourcing data from the agent cache.
func CacheCompiledDiscoveryChain(c *cache.Cache) proxycfg.CompiledDiscoveryChain {
return &cacheProxyDataSource[*structs.DiscoveryChainRequest]{c, cachetype.CompiledDiscoveryChainName}
}
// CacheConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
// data from the agent cache.
func CacheConfigEntry(c *cache.Cache) proxycfg.ConfigEntry {

View File

@ -0,0 +1,34 @@
package proxycfgglue
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg"
)
func getEventResult[ResultType any](t *testing.T, eventCh <-chan proxycfg.UpdateEvent) ResultType {
t.Helper()
select {
case event := <-eventCh:
require.NoError(t, event.Err, "event should not have an error")
result, ok := event.Result.(ResultType)
require.Truef(t, ok, "unexpected result type: %T", event.Result)
return result
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
panic("this should never be reached")
}
func expectNoEvent(t *testing.T, eventCh <-chan proxycfg.UpdateEvent) {
select {
case <-eventCh:
t.Fatal("expected no event")
case <-time.After(100 * time.Millisecond):
}
}

View File

@ -3,7 +3,6 @@ package proxycfgglue
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
@ -62,7 +61,7 @@ func TestServerIntentionUpstreams(t *testing.T) {
authz := policyAuthorizer(t, `service "db" { policy = "read" }`)
dataSource := ServerIntentionUpstreams(ServerDataSourceDeps{
ACLResolver: staticResolver{authz},
ACLResolver: newStaticResolver(authz),
GetStore: func() Store { return store },
})
@ -70,28 +69,16 @@ func TestServerIntentionUpstreams(t *testing.T) {
err := dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ServiceName: serviceName}, "", ch)
require.NoError(t, err)
select {
case event := <-ch:
result, ok := event.Result.(*structs.IndexedServiceList)
require.Truef(t, ok, "expected IndexedServiceList, got: %T", event.Result)
require.Len(t, result.Services, 0)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
result := getEventResult[*structs.IndexedServiceList](t, ch)
require.Len(t, result.Services, 0)
// Create an allow intention for the db service. This should *not* be filtered
// out because the ACL token *does* have read access on it.
createIntention("db")
select {
case event := <-ch:
result, ok := event.Result.(*structs.IndexedServiceList)
require.Truef(t, ok, "expected IndexedServiceList, got: %T", event.Result)
require.Len(t, result.Services, 1)
require.Equal(t, "db", result.Services[0].Name)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
result = getEventResult[*structs.IndexedServiceList](t, ch)
require.Len(t, result.Services, 1)
require.Equal(t, "db", result.Services[0].Name)
}
func disableLegacyIntentions(t *testing.T, store *state.Store) {

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestServerIntentions_Enterprise(t *testing.T) {
@ -39,7 +40,7 @@ func TestServerIntentions_Enterprise(t *testing.T) {
go publisher.Run(ctx)
intentions := ServerIntentions(ServerDataSourceDeps{
ACLResolver: staticResolver{acl.ManageAll()},
ACLResolver: newStaticResolver(acl.ManageAll()),
ViewStore: store,
EventPublisher: publisher,
Logger: logger,
@ -51,37 +52,29 @@ func TestServerIntentions_Enterprise(t *testing.T) {
ServiceName: serviceName,
}, "", eventCh))
// Wait for the initial snapshots.
select {
case <-eventCh:
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
testutil.RunStep(t, "initial snapshot", func(t *testing.T) {
getEventResult[structs.Intentions](t, eventCh)
})
// Publish a namespace wildcard intention.
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 1,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Upsert,
Value: &structs.ServiceIntentionsConfigEntry{
Name: structs.WildcardSpecifier,
EnterpriseMeta: *acl.WildcardEnterpriseMeta(),
Sources: []*structs.SourceIntention{
{Name: structs.WildcardSpecifier, Action: structs.IntentionActionAllow, Precedence: 1},
testutil.RunStep(t, "publish a namespace-wildcard partition", func(t *testing.T) {
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 1,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Upsert,
Value: &structs.ServiceIntentionsConfigEntry{
Name: structs.WildcardSpecifier,
EnterpriseMeta: *acl.WildcardEnterpriseMeta(),
Sources: []*structs.SourceIntention{
{Name: structs.WildcardSpecifier, Action: structs.IntentionActionAllow, Precedence: 1},
},
},
},
},
},
})
})
select {
case event := <-eventCh:
result, ok := event.Result.(structs.Intentions)
require.Truef(t, ok, "expected Intentions, got: %T", event.Result)
result := getEventResult[structs.Intentions](t, eventCh)
require.Len(t, result, 1)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
})
}

View File

@ -2,6 +2,7 @@ package proxycfgglue
import (
"context"
"sync"
"testing"
"time"
@ -16,6 +17,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestServerIntentions(t *testing.T) {
@ -39,7 +41,7 @@ func TestServerIntentions(t *testing.T) {
go publisher.Run(ctx)
intentions := ServerIntentions(ServerDataSourceDeps{
ACLResolver: staticResolver{acl.ManageAll()},
ACLResolver: newStaticResolver(acl.ManageAll()),
ViewStore: store,
EventPublisher: publisher,
Logger: logger,
@ -51,64 +53,53 @@ func TestServerIntentions(t *testing.T) {
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}, "", eventCh))
// Wait for the initial snapshots.
select {
case <-eventCh:
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
testutil.RunStep(t, "initial snapshot", func(t *testing.T) {
getEventResult[structs.Intentions](t, eventCh)
})
// Publish an explicit intention on the service.
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 1,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Upsert,
Value: &structs.ServiceIntentionsConfigEntry{
Name: serviceName,
Sources: []*structs.SourceIntention{
{Name: "db", Action: structs.IntentionActionAllow, Precedence: 1},
testutil.RunStep(t, "publishing an explicit intention", func(t *testing.T) {
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 1,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Upsert,
Value: &structs.ServiceIntentionsConfigEntry{
Name: serviceName,
Sources: []*structs.SourceIntention{
{Name: "db", Action: structs.IntentionActionAllow, Precedence: 1},
},
},
},
},
},
})
})
select {
case event := <-eventCh:
result, ok := event.Result.(structs.Intentions)
require.Truef(t, ok, "expected Intentions, got: %T", event.Result)
result := getEventResult[structs.Intentions](t, eventCh)
require.Len(t, result, 1)
intention := result[0]
require.Equal(t, intention.DestinationName, serviceName)
require.Equal(t, intention.SourceName, "db")
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
})
// Publish a wildcard intention.
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 2,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Upsert,
Value: &structs.ServiceIntentionsConfigEntry{
Name: structs.WildcardSpecifier,
Sources: []*structs.SourceIntention{
{Name: structs.WildcardSpecifier, Action: structs.IntentionActionAllow, Precedence: 0},
testutil.RunStep(t, "publishing a wildcard intention", func(t *testing.T) {
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 2,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Upsert,
Value: &structs.ServiceIntentionsConfigEntry{
Name: structs.WildcardSpecifier,
Sources: []*structs.SourceIntention{
{Name: structs.WildcardSpecifier, Action: structs.IntentionActionAllow, Precedence: 0},
},
},
},
},
},
})
})
select {
case event := <-eventCh:
result, ok := event.Result.(structs.Intentions)
require.Truef(t, ok, "expected Intentions, got: %T", event.Result)
result := getEventResult[structs.Intentions](t, eventCh)
require.Len(t, result, 2)
a := result[0]
@ -118,38 +109,48 @@ func TestServerIntentions(t *testing.T) {
b := result[1]
require.Equal(t, b.DestinationName, structs.WildcardSpecifier)
require.Equal(t, b.SourceName, structs.WildcardSpecifier)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
// Publish a delete event and observe the intention is removed from the results.
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 3,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Delete,
Value: &structs.ServiceIntentionsConfigEntry{
Name: serviceName,
},
},
},
})
select {
case event := <-eventCh:
result, ok := event.Result.(structs.Intentions)
require.Truef(t, ok, "expected Intentions, got: %T", event.Result)
testutil.RunStep(t, "publishing a delete event", func(t *testing.T) {
publisher.Publish([]stream.Event{
{
Topic: pbsubscribe.Topic_ServiceIntentions,
Index: index + 3,
Payload: state.EventPayloadConfigEntry{
Op: pbsubscribe.ConfigEntryUpdate_Delete,
Value: &structs.ServiceIntentionsConfigEntry{
Name: serviceName,
},
},
},
})
result := getEventResult[structs.Intentions](t, eventCh)
require.Len(t, result, 1)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
})
}
type staticResolver struct {
mu sync.Mutex
authorizer acl.Authorizer
}
func (r staticResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzContext *acl.AuthorizerContext) (resolver.Result, error) {
func newStaticResolver(authz acl.Authorizer) *staticResolver {
resolver := new(staticResolver)
resolver.SwapAuthorizer(authz)
return resolver
}
func (r *staticResolver) SwapAuthorizer(authz acl.Authorizer) {
r.mu.Lock()
defer r.mu.Unlock()
r.authorizer = authz
}
func (r *staticResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzContext *acl.AuthorizerContext) (resolver.Result, error) {
r.mu.Lock()
defer r.mu.Unlock()
return resolver.Result{Authorizer: r.authorizer}, nil
}