mirror of https://github.com/status-im/consul.git
Proxy Config Manager (#4729)
* Proxy Config Manager This component watches for local state changes on the agent and ensures that each service registered locally with Kind == connect-proxy has it's state being actively populated in the cache. This serves two purposes: 1. For the built-in proxy, it ensures that the state needed to accept connections is available in RAM shortly after registration and likely before the proxy actually starts accepting traffic. 2. For (future - next PR) xDS server and other possible future proxies that require _push_ based config discovery, this provides a mechanism to subscribe and be notified about updates to a proxy instance's config including upstream service discovery results. * Address review comments * Better comments; Better delivery of latest snapshot for slow watchers; Embed Config * Comment typos * Add upstream Stringer for funsies
This commit is contained in:
parent
96b9b95a19
commit
0f27ffd163
|
@ -190,6 +190,17 @@ type State struct {
|
|||
// tokens contains the ACL tokens
|
||||
tokens *token.Store
|
||||
|
||||
// notifyHandlers is a map of registered channel listeners that are sent
|
||||
// messages whenever state changes occur. For now these events only include
|
||||
// service registration and deregistration since that is all that is needed
|
||||
// but the same mechanism could be used for other state changes.
|
||||
//
|
||||
// Note that we haven't refactored managedProxyHandlers into this mechanism
|
||||
// yet because that is soon to be deprecated and removed so it's easier to
|
||||
// just leave them separate until managed proxies are removed entirely. Any
|
||||
// future notifications should re-use this mechanism though.
|
||||
notifyHandlers map[chan<- struct{}]struct{}
|
||||
|
||||
// managedProxies is a map of all managed connect proxies registered locally on
|
||||
// this agent. This is NOT kept in sync with servers since it's agent-local
|
||||
// config only. Proxy instances have separate service registrations in the
|
||||
|
@ -215,6 +226,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
|
|||
checkAliases: make(map[string]map[types.CheckID]chan<- struct{}),
|
||||
metadata: make(map[string]string),
|
||||
tokens: tokens,
|
||||
notifyHandlers: make(map[chan<- struct{}]struct{}),
|
||||
managedProxies: make(map[string]*ManagedProxy),
|
||||
managedProxyHandlers: make(map[chan<- struct{}]struct{}),
|
||||
}
|
||||
|
@ -290,6 +302,7 @@ func (l *State) RemoveService(id string) error {
|
|||
s.WatchCh = nil
|
||||
}
|
||||
l.TriggerSyncChanges()
|
||||
l.broadcastUpdateLocked()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -355,6 +368,7 @@ func (l *State) SetServiceState(s *ServiceState) {
|
|||
}
|
||||
|
||||
l.TriggerSyncChanges()
|
||||
l.broadcastUpdateLocked()
|
||||
}
|
||||
|
||||
// ServiceStates returns a shallow copy of all service state records.
|
||||
|
@ -683,11 +697,18 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token,
|
|||
Service: target.Service + "-proxy",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: target.Service,
|
||||
LocalServiceAddress: cfg.LocalServiceAddress,
|
||||
LocalServicePort: cfg.LocalServicePort,
|
||||
},
|
||||
Address: cfg.BindAddress,
|
||||
Port: cfg.BindPort,
|
||||
}
|
||||
|
||||
// Set default port now while the target is known
|
||||
if svc.Proxy.LocalServicePort < 1 {
|
||||
svc.Proxy.LocalServicePort = target.Port
|
||||
}
|
||||
|
||||
// Lock now. We can't lock earlier as l.Service would deadlock and shouldn't
|
||||
// anyway to minimise the critical section.
|
||||
l.Lock()
|
||||
|
@ -821,6 +842,41 @@ func (l *State) Proxies() map[string]*ManagedProxy {
|
|||
return m
|
||||
}
|
||||
|
||||
// broadcastUpdateLocked assumes l is locked and delivers an update to all
|
||||
// registered watchers.
|
||||
func (l *State) broadcastUpdateLocked() {
|
||||
for ch := range l.notifyHandlers {
|
||||
// Do not block
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Notify will register a channel to receive messages when the local state
|
||||
// changes. Only service add/remove are supported for now. See notes on
|
||||
// l.notifyHandlers for more details.
|
||||
//
|
||||
// This will not block on channel send so ensure the channel has a buffer. Note
|
||||
// that any buffer size is generally fine since actual data is not sent over the
|
||||
// channel, so a dropped send due to a full buffer does not result in any loss
|
||||
// of data. The fact that a buffer already contains a notification means that
|
||||
// the receiver will still be notified that changes occurred.
|
||||
func (l *State) Notify(ch chan<- struct{}) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
l.notifyHandlers[ch] = struct{}{}
|
||||
}
|
||||
|
||||
// StopNotify will deregister a channel receiving state change notifications.
|
||||
// Pair this with all calls to Notify to clean up state.
|
||||
func (l *State) StopNotify(ch chan<- struct{}) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
delete(l.notifyHandlers, ch)
|
||||
}
|
||||
|
||||
// NotifyProxy will register a channel to receive messages when the
|
||||
// configuration or set of proxies changes. This will not block on
|
||||
// channel send so ensure the channel has a buffer. Note that any buffer
|
||||
|
|
|
@ -1901,6 +1901,67 @@ func checksInSync(state *local.State, wantChecks int) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func TestState_Notify(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
state := local.NewState(local.Config{},
|
||||
log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
||||
|
||||
// Stub state syncing
|
||||
state.TriggerSyncChanges = func() {}
|
||||
|
||||
require := require.New(t)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Register a notifier
|
||||
notifyCh := make(chan struct{}, 1)
|
||||
state.Notify(notifyCh)
|
||||
defer state.StopNotify(notifyCh)
|
||||
assert.Empty(notifyCh)
|
||||
drainCh(notifyCh)
|
||||
|
||||
// Add a service
|
||||
err := state.AddService(&structs.NodeService{
|
||||
Service: "web",
|
||||
}, "fake-token-web")
|
||||
require.NoError(err)
|
||||
|
||||
// Should have a notification
|
||||
assert.NotEmpty(notifyCh)
|
||||
drainCh(notifyCh)
|
||||
|
||||
// Re-Add same service
|
||||
err = state.AddService(&structs.NodeService{
|
||||
Service: "web",
|
||||
Port: 4444,
|
||||
}, "fake-token-web")
|
||||
require.NoError(err)
|
||||
|
||||
// Should have a notification
|
||||
assert.NotEmpty(notifyCh)
|
||||
drainCh(notifyCh)
|
||||
|
||||
// Remove service
|
||||
require.NoError(state.RemoveService("web"))
|
||||
|
||||
// Should have a notification
|
||||
assert.NotEmpty(notifyCh)
|
||||
drainCh(notifyCh)
|
||||
|
||||
// Stopping should... stop
|
||||
state.StopNotify(notifyCh)
|
||||
|
||||
// Add a service
|
||||
err = state.AddService(&structs.NodeService{
|
||||
Service: "web",
|
||||
}, "fake-token-web")
|
||||
require.NoError(err)
|
||||
|
||||
// Should NOT have a notification
|
||||
assert.Empty(notifyCh)
|
||||
drainCh(notifyCh)
|
||||
}
|
||||
|
||||
func TestStateProxyManagement(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -0,0 +1,350 @@
|
|||
package proxycfg
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrStopped is returned from Run if the manager instance has already been
|
||||
// stopped.
|
||||
ErrStopped = errors.New("manager stopped")
|
||||
|
||||
// ErrStarted is returned from Run if the manager instance has already run.
|
||||
ErrStarted = errors.New("manager was already run")
|
||||
)
|
||||
|
||||
// CancelFunc is a type for a returned function that can be called to cancel a
|
||||
// watch.
|
||||
type CancelFunc func()
|
||||
|
||||
// Manager is a component that integrates into the agent and manages Connect
|
||||
// proxy configuration state. This should not be confused with the deprecated
|
||||
// "managed proxy" concept where the agent supervises the actual proxy process.
|
||||
// proxycfg.Manager is oblivious to the distinction and manages state for any
|
||||
// service registered with Kind == connect-proxy.
|
||||
//
|
||||
// The Manager ensures that any Connect proxy registered on the agent has all
|
||||
// the state it needs cached locally via the agent cache. State includes
|
||||
// certificates, intentions, and service discovery results for any declared
|
||||
// upstreams. See package docs for more detail.
|
||||
type Manager struct {
|
||||
ManagerConfig
|
||||
|
||||
// stateCh is notified for any service changes in local state. We only use
|
||||
// this to trigger on _new_ service addition since it has no data and we don't
|
||||
// want to maintain a full copy of the state in order to diff and figure out
|
||||
// what changed. Luckily each service has it's own WatchCh so we can figure
|
||||
// out changes and removals with those efficiently.
|
||||
stateCh chan struct{}
|
||||
|
||||
mu sync.Mutex
|
||||
started bool
|
||||
proxies map[string]*state
|
||||
watchers map[string]map[uint64]chan *ConfigSnapshot
|
||||
}
|
||||
|
||||
// ManagerConfig holds the required external dependencies for a Manager
|
||||
// instance. All fields must be set to something valid or the manager will
|
||||
// panic. The ManagerConfig is passed by value to NewManager so the passed value
|
||||
// can be mutated safely.
|
||||
type ManagerConfig struct {
|
||||
// Cache is the agent's cache instance that can be used to retrieve, store and
|
||||
// monitor state for the proxies.
|
||||
Cache *cache.Cache
|
||||
// state is the agent's local state to be watched for new proxy registrations.
|
||||
State *local.State
|
||||
// source describes the current agent's identity, it's used directly for
|
||||
// prepared query discovery but also indirectly as a way to pass current
|
||||
// Datacenter name into other request types that need it. This is sufficient
|
||||
// for now and cleaner than passing the entire RuntimeConfig.
|
||||
Source *structs.QuerySource
|
||||
// logger is the agent's logger to be used for logging logs.
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// NewManager constructs a manager from the provided agent cache.
|
||||
func NewManager(cfg ManagerConfig) (*Manager, error) {
|
||||
if cfg.Cache == nil || cfg.State == nil || cfg.Source == nil ||
|
||||
cfg.Logger == nil {
|
||||
return nil, errors.New("all ManagerConfig fields must be provided")
|
||||
}
|
||||
m := &Manager{
|
||||
ManagerConfig: cfg,
|
||||
// Single item buffer is enough since there is no data transferred so this
|
||||
// is "level triggering" and we can't miss actual data.
|
||||
stateCh: make(chan struct{}, 1),
|
||||
proxies: make(map[string]*state),
|
||||
watchers: make(map[string]map[uint64]chan *ConfigSnapshot),
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Run is the long-running method that handles state syncing. It should be run
|
||||
// in it's own goroutine and will continue until a fatal error is hit or Close
|
||||
// is called. Run will return an error if it is called more than once, or called
|
||||
// after Close.
|
||||
func (m *Manager) Run() error {
|
||||
m.mu.Lock()
|
||||
alreadyStarted := m.started
|
||||
m.started = true
|
||||
stateCh := m.stateCh
|
||||
m.mu.Unlock()
|
||||
|
||||
// Protect against multiple Run calls.
|
||||
if alreadyStarted {
|
||||
return ErrStarted
|
||||
}
|
||||
|
||||
// Protect against being run after Close.
|
||||
if stateCh == nil {
|
||||
return ErrStopped
|
||||
}
|
||||
|
||||
// Register for notifications about state changes
|
||||
m.State.Notify(stateCh)
|
||||
defer m.State.StopNotify(stateCh)
|
||||
|
||||
for {
|
||||
m.mu.Lock()
|
||||
|
||||
// Traverse the local state and ensure all proxy services are registered
|
||||
services := m.State.Services()
|
||||
for svcID, svc := range services {
|
||||
if svc.Kind != structs.ServiceKindConnectProxy {
|
||||
continue
|
||||
}
|
||||
// TODO(banks): need to work out when to default some stuff. For example
|
||||
// Proxy.LocalServicePort is practically necessary for any sidecar and can
|
||||
// default to the port of the sidecar service, but only if it's already
|
||||
// registered and once we get past here, we don't have enough context to
|
||||
// know that so we'd need to set it here if not during registration of the
|
||||
// proxy service. Sidecar Service and managed proxies in the interim can
|
||||
// do that, but we should validate more generally that that is always
|
||||
// true.
|
||||
err := m.ensureProxyServiceLocked(svc, m.State.ServiceToken(svcID))
|
||||
if err != nil {
|
||||
m.Logger.Printf("[ERR] failed to watch proxy service %s: %s", svc.ID,
|
||||
err)
|
||||
}
|
||||
}
|
||||
|
||||
// Now see if any proxies were removed
|
||||
for proxyID := range m.proxies {
|
||||
if _, ok := services[proxyID]; !ok {
|
||||
// Remove them
|
||||
m.removeProxyServiceLocked(proxyID)
|
||||
}
|
||||
}
|
||||
|
||||
m.mu.Unlock()
|
||||
|
||||
// Wait for a state change
|
||||
_, ok := <-stateCh
|
||||
if !ok {
|
||||
// Stopped
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ensureProxyServiceLocked adds or changes the proxy to our state.
|
||||
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string) error {
|
||||
state, ok := m.proxies[ns.ID]
|
||||
|
||||
if ok {
|
||||
if !state.Changed(ns, token) {
|
||||
// No change
|
||||
return nil
|
||||
}
|
||||
|
||||
// We are updating the proxy, close it's old state
|
||||
state.Close()
|
||||
}
|
||||
|
||||
var err error
|
||||
state, err = newState(ns, token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the necessary dependencies
|
||||
state.logger = m.Logger
|
||||
state.cache = m.Cache
|
||||
state.source = m.Source
|
||||
|
||||
ch, err := state.Watch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.proxies[ns.ID] = state
|
||||
|
||||
// Start a goroutine that will wait for changes and broadcast them to watchers.
|
||||
go func(ch <-chan ConfigSnapshot) {
|
||||
// Run until ch is closed
|
||||
for snap := range ch {
|
||||
m.notify(&snap)
|
||||
}
|
||||
}(ch)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeProxyService is called when a service deregisters and frees all
|
||||
// resources for that service.
|
||||
func (m *Manager) removeProxyServiceLocked(proxyID string) {
|
||||
state, ok := m.proxies[proxyID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Closing state will let the goroutine we started in Ensure finish since
|
||||
// watch chan is closed.
|
||||
state.Close()
|
||||
delete(m.proxies, proxyID)
|
||||
|
||||
// We intentionally leave potential watchers hanging here - there is no new
|
||||
// config for them and closing their channels might be indistinguishable from
|
||||
// an error that they should retry. We rely for them to eventually give up
|
||||
// (because they are in fact not running any more) and so the watches be
|
||||
// cleaned up naturally.
|
||||
}
|
||||
|
||||
func (m *Manager) notify(snap *ConfigSnapshot) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
watchers, ok := m.watchers[snap.ProxyID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for _, ch := range watchers {
|
||||
// Attempt delivery but don't let slow consumers block us forever. They
|
||||
// might miss updates but it's better than breaking everything.
|
||||
//
|
||||
// TODO(banks): should we close their chan here to force them to eventually
|
||||
// notice they are too slow? Not sure if it really helps.
|
||||
select {
|
||||
case ch <- snap:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deliverLatest delivers the snapshot to a watch chan. If the delivery blocks,
|
||||
// it will drain the chan and then re-attempt delivery so that a slow consumer
|
||||
// gets the latest config earlier. This MUST be called from a method where m.mu
|
||||
// is held to be safe since it assumes we are the only goroutine sending on ch.
|
||||
func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan *ConfigSnapshot) {
|
||||
// Send if chan is empty
|
||||
select {
|
||||
case ch <- snap:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Not empty, drain the chan of older snapshots and redeliver. For now we only
|
||||
// use 1-buffered chans but this will still work if we change that later.
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
continue
|
||||
default:
|
||||
break OUTER
|
||||
}
|
||||
}
|
||||
|
||||
// Now send again
|
||||
select {
|
||||
case ch <- snap:
|
||||
return
|
||||
default:
|
||||
// This should not be possible since we should be the only sender, enforced
|
||||
// by m.mu but error and drop the update rather than panic.
|
||||
m.Logger.Printf("[ERR] proxycfg: failed to deliver ConfigSnapshot to %q",
|
||||
snap.ProxyID)
|
||||
}
|
||||
}
|
||||
|
||||
// Watch registers a watch on a proxy. It might not exist yet in which case this
|
||||
// will not fail, but no updates will be delivered until the proxy is
|
||||
// registered. If there is already a valid snapshot in memory, it will be
|
||||
// delivered immediately.
|
||||
func (m *Manager) Watch(proxyID string) (<-chan *ConfigSnapshot, CancelFunc) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
// This buffering is crucial otherwise we'd block immediately trying to
|
||||
// deliver the current snapshot below if we already have one.
|
||||
ch := make(chan *ConfigSnapshot, 1)
|
||||
watchers, ok := m.watchers[proxyID]
|
||||
if !ok {
|
||||
watchers = make(map[uint64]chan *ConfigSnapshot)
|
||||
}
|
||||
idx := uint64(len(watchers))
|
||||
watchers[idx] = ch
|
||||
m.watchers[proxyID] = watchers
|
||||
|
||||
// Deliver the current snapshot immediately if there is one ready
|
||||
if state, ok := m.proxies[proxyID]; ok {
|
||||
if snap := state.CurrentSnapshot(); snap != nil {
|
||||
// We rely on ch being buffered above and that it's not been passed
|
||||
// anywhere so we must be the only writer so this will never block and
|
||||
// deadlock.
|
||||
ch <- snap
|
||||
}
|
||||
}
|
||||
|
||||
return ch, func() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.closeWatchLocked(proxyID, idx)
|
||||
}
|
||||
}
|
||||
|
||||
// closeWatchLocked cleans up state related to a single watcher. It assumes the
|
||||
// lock is held.
|
||||
func (m *Manager) closeWatchLocked(proxyID string, watchIdx uint64) {
|
||||
if watchers, ok := m.watchers[proxyID]; ok {
|
||||
if ch, ok := watchers[watchIdx]; ok {
|
||||
delete(watchers, watchIdx)
|
||||
close(ch)
|
||||
if len(watchers) == 0 {
|
||||
delete(m.watchers, proxyID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close removes all state and stops all running goroutines.
|
||||
func (m *Manager) Close() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if m.stateCh != nil {
|
||||
close(m.stateCh)
|
||||
m.stateCh = nil
|
||||
}
|
||||
|
||||
// Close all current watchers first
|
||||
for proxyID, watchers := range m.watchers {
|
||||
for idx := range watchers {
|
||||
m.closeWatchLocked(proxyID, idx)
|
||||
}
|
||||
}
|
||||
|
||||
// Then close all states
|
||||
for proxyID, state := range m.proxies {
|
||||
state.Close()
|
||||
delete(m.proxies, proxyID)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,288 @@
|
|||
package proxycfg
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
)
|
||||
|
||||
// assertLastReqArgs verifies that each request type had the correct source
|
||||
// parameters (e.g. Datacenter name) and token.
|
||||
func assertLastReqArgs(t *testing.T, types *TestCacheTypes, token string, source *structs.QuerySource) {
|
||||
t.Helper()
|
||||
// Roots needs correct DC and token
|
||||
rootReq := types.roots.lastReq.Load()
|
||||
require.IsType(t, rootReq, &structs.DCSpecificRequest{})
|
||||
require.Equal(t, token, rootReq.(*structs.DCSpecificRequest).Token)
|
||||
require.Equal(t, source.Datacenter, rootReq.(*structs.DCSpecificRequest).Datacenter)
|
||||
|
||||
// Leaf needs correct DC and token
|
||||
leafReq := types.leaf.lastReq.Load()
|
||||
require.IsType(t, leafReq, &cachetype.ConnectCALeafRequest{})
|
||||
require.Equal(t, token, leafReq.(*cachetype.ConnectCALeafRequest).Token)
|
||||
require.Equal(t, source.Datacenter, leafReq.(*cachetype.ConnectCALeafRequest).Datacenter)
|
||||
|
||||
// Intentions needs correct DC and token
|
||||
intReq := types.intentions.lastReq.Load()
|
||||
require.IsType(t, intReq, &structs.IntentionQueryRequest{})
|
||||
require.Equal(t, token, intReq.(*structs.IntentionQueryRequest).Token)
|
||||
require.Equal(t, source.Datacenter, intReq.(*structs.IntentionQueryRequest).Datacenter)
|
||||
}
|
||||
|
||||
func TestManager_BasicLifecycle(t *testing.T) {
|
||||
// Use a mocked cache to make life simpler
|
||||
types := NewTestCacheTypes(t)
|
||||
c := TestCacheWithTypes(t, types)
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
roots, leaf := TestCerts(t)
|
||||
|
||||
// Setup initial values
|
||||
types.roots.value.Store(roots)
|
||||
types.leaf.value.Store(leaf)
|
||||
types.intentions.value.Store(TestIntentions(t))
|
||||
types.health.value.Store(
|
||||
&structs.IndexedCheckServiceNodes{
|
||||
Nodes: TestUpstreamNodes(t),
|
||||
})
|
||||
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
state := local.NewState(local.Config{}, logger, &token.Store{})
|
||||
source := &structs.QuerySource{
|
||||
Node: "node1",
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
// Stub state syncing
|
||||
state.TriggerSyncChanges = func() {}
|
||||
|
||||
// Create manager
|
||||
m, err := NewManager(ManagerConfig{c, state, source, logger})
|
||||
require.NoError(err)
|
||||
|
||||
// And run it
|
||||
go func() {
|
||||
err := m.Run()
|
||||
require.NoError(err)
|
||||
}()
|
||||
|
||||
// Register a proxy for "web"
|
||||
webProxy := &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "web-sidecar-proxy",
|
||||
Service: "web-sidecar-proxy",
|
||||
Port: 9999,
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceID: "web",
|
||||
DestinationServiceName: "web",
|
||||
LocalServiceAddress: "127.0.0.1",
|
||||
LocalServicePort: 8080,
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
Upstreams: structs.TestUpstreams(t),
|
||||
},
|
||||
}
|
||||
|
||||
// BEFORE we register, we should be able to get a watch channel
|
||||
wCh, cancel := m.Watch(webProxy.ID)
|
||||
defer cancel()
|
||||
|
||||
// And it should block with nothing sent on it yet
|
||||
assertWatchChanBlocks(t, wCh)
|
||||
|
||||
require.NoError(state.AddService(webProxy, "my-token"))
|
||||
|
||||
// We should see the initial config delivered but not until after the
|
||||
// coalesce timeout
|
||||
expectSnap := &ConfigSnapshot{
|
||||
ProxyID: webProxy.ID,
|
||||
Address: webProxy.Address,
|
||||
Port: webProxy.Port,
|
||||
Proxy: webProxy.Proxy,
|
||||
Roots: roots,
|
||||
Leaf: leaf,
|
||||
UpstreamEndpoints: map[string]structs.CheckServiceNodes{
|
||||
"service:db": TestUpstreamNodes(t),
|
||||
},
|
||||
}
|
||||
start := time.Now()
|
||||
assertWatchChanRecvs(t, wCh, expectSnap)
|
||||
require.True(time.Since(start) >= coalesceTimeout)
|
||||
|
||||
assertLastReqArgs(t, types, "my-token", source)
|
||||
|
||||
// Update NodeConfig
|
||||
webProxy.Port = 7777
|
||||
require.NoError(state.AddService(webProxy, "my-token"))
|
||||
|
||||
expectSnap.Port = 7777
|
||||
assertWatchChanRecvs(t, wCh, expectSnap)
|
||||
|
||||
// Register a second watcher
|
||||
wCh2, cancel2 := m.Watch(webProxy.ID)
|
||||
defer cancel2()
|
||||
|
||||
// New watcher should immediately receive the current state
|
||||
assertWatchChanRecvs(t, wCh2, expectSnap)
|
||||
|
||||
// Change token
|
||||
require.NoError(state.AddService(webProxy, "other-token"))
|
||||
assertWatchChanRecvs(t, wCh, expectSnap)
|
||||
assertWatchChanRecvs(t, wCh2, expectSnap)
|
||||
|
||||
// This is actually sort of timing dependent - the cache background fetcher
|
||||
// will still be fetching with the old token, but we rely on the fact that our
|
||||
// mock type will have been blocked on those for a while.
|
||||
assertLastReqArgs(t, types, "other-token", source)
|
||||
// Update roots
|
||||
newRoots, newLeaf := TestCerts(t)
|
||||
newRoots.Roots = append(newRoots.Roots, roots.Roots...)
|
||||
types.roots.Set(newRoots)
|
||||
|
||||
// Expect new roots in snapshot
|
||||
expectSnap.Roots = newRoots
|
||||
assertWatchChanRecvs(t, wCh, expectSnap)
|
||||
assertWatchChanRecvs(t, wCh2, expectSnap)
|
||||
|
||||
// Update leaf
|
||||
types.leaf.Set(newLeaf)
|
||||
|
||||
// Expect new roots in snapshot
|
||||
expectSnap.Leaf = newLeaf
|
||||
assertWatchChanRecvs(t, wCh, expectSnap)
|
||||
assertWatchChanRecvs(t, wCh2, expectSnap)
|
||||
|
||||
// Remove the proxy
|
||||
state.RemoveService(webProxy.ID)
|
||||
|
||||
// Chan should NOT close
|
||||
assertWatchChanBlocks(t, wCh)
|
||||
assertWatchChanBlocks(t, wCh2)
|
||||
|
||||
// Re-add the proxy with another new port
|
||||
webProxy.Port = 3333
|
||||
require.NoError(state.AddService(webProxy, "other-token"))
|
||||
|
||||
// Same watch chan should be notified again
|
||||
expectSnap.Port = 3333
|
||||
assertWatchChanRecvs(t, wCh, expectSnap)
|
||||
assertWatchChanRecvs(t, wCh2, expectSnap)
|
||||
|
||||
// Cancel watch
|
||||
cancel()
|
||||
|
||||
// Watch chan should be closed
|
||||
assertWatchChanRecvs(t, wCh, nil)
|
||||
|
||||
// We specifically don't remove the proxy or cancel the second watcher to
|
||||
// ensure both are cleaned up by close.
|
||||
require.NoError(m.Close())
|
||||
|
||||
// Sanity check the state is clean
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
require.Len(m.proxies, 0)
|
||||
require.Len(m.watchers, 0)
|
||||
}
|
||||
|
||||
func assertWatchChanBlocks(t *testing.T, ch <-chan *ConfigSnapshot) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("Should be nothing sent on watch chan yet")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func assertWatchChanRecvs(t *testing.T, ch <-chan *ConfigSnapshot, expect *ConfigSnapshot) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case got, ok := <-ch:
|
||||
require.Equal(t, expect, got)
|
||||
if expect == nil {
|
||||
require.False(t, ok, "watch chan should be closed")
|
||||
}
|
||||
case <-time.After(50*time.Millisecond + coalesceTimeout):
|
||||
t.Fatal("recv timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_deliverLatest(t *testing.T) {
|
||||
// None of these need to do anything to test this method just be valid
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
cfg := ManagerConfig{
|
||||
Cache: cache.New(nil),
|
||||
State: local.NewState(local.Config{}, logger, &token.Store{}),
|
||||
Source: &structs.QuerySource{
|
||||
Node: "node1",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
Logger: logger,
|
||||
}
|
||||
require := require.New(t)
|
||||
|
||||
m, err := NewManager(cfg)
|
||||
require.NoError(err)
|
||||
|
||||
snap1 := &ConfigSnapshot{
|
||||
ProxyID: "test-proxy",
|
||||
Port: 1111,
|
||||
}
|
||||
snap2 := &ConfigSnapshot{
|
||||
ProxyID: "test-proxy",
|
||||
Port: 2222,
|
||||
}
|
||||
|
||||
// Put an overall time limit on this test case so we don't have to guard every
|
||||
// call to ensure the whole test doesn't deadlock.
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
t.Fatal("test timed out")
|
||||
})
|
||||
|
||||
// test 1 buffered chan
|
||||
ch1 := make(chan *ConfigSnapshot, 1)
|
||||
|
||||
// Sending to an unblocked chan should work
|
||||
m.deliverLatest(snap1, ch1)
|
||||
|
||||
// Check it was delivered
|
||||
require.Equal(snap1, <-ch1)
|
||||
|
||||
// Now send both without reading simulating a slow client
|
||||
m.deliverLatest(snap1, ch1)
|
||||
m.deliverLatest(snap2, ch1)
|
||||
|
||||
// Check we got the _second_ one
|
||||
require.Equal(snap2, <-ch1)
|
||||
|
||||
// Same again for 5-buffered chan
|
||||
ch5 := make(chan *ConfigSnapshot, 5)
|
||||
|
||||
// Sending to an unblocked chan should work
|
||||
m.deliverLatest(snap1, ch5)
|
||||
|
||||
// Check it was delivered
|
||||
require.Equal(snap1, <-ch5)
|
||||
|
||||
// Now send enough to fill the chan simulating a slow client
|
||||
for i := 0; i < 5; i++ {
|
||||
m.deliverLatest(snap1, ch5)
|
||||
}
|
||||
m.deliverLatest(snap2, ch5)
|
||||
|
||||
// Check we got the _second_ one
|
||||
require.Equal(snap2, <-ch5)
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
// Package proxycfg provides a component that monitors local agent state for
|
||||
// Connect proxy service registrations and maintains the necessary cache state
|
||||
// for those proxies locally. Local cache state keeps pull based proxies (e.g.
|
||||
// the built in one) performant even on first request/startup, and allows for
|
||||
// push-based proxy APIs (e.g. xDS for Envoy) to be notified of updates to the
|
||||
// proxy configuration.
|
||||
//
|
||||
// The relationship with other agent components looks like this:
|
||||
//
|
||||
// +------------------------------------------+
|
||||
// | AGENT |
|
||||
// | |
|
||||
// | +--------+ 1. +----------+ |
|
||||
// | | local |<-----+ proxycfg |<--------+ |
|
||||
// | | state +----->| Manager |<---+ | |
|
||||
// | +--------+ 2. +^---+-----+ | | |
|
||||
// | 5.| | | | |
|
||||
// | +----------+ | +-------+--+ |4. |
|
||||
// | | +->| proxycfg | | |
|
||||
// | | 3.| | State | | |
|
||||
// | | | +----------+ | |
|
||||
// | | | | |
|
||||
// | | | +----------+ | |
|
||||
// | | +->| proxycfg +-+ |
|
||||
// | | | State | |
|
||||
// | | +----------+ |
|
||||
// | |6. |
|
||||
// | +----v---+ |
|
||||
// | | xDS | |
|
||||
// | | Server | |
|
||||
// | +--------+ |
|
||||
// | |
|
||||
// +------------------------------------------+
|
||||
//
|
||||
// 1. Manager watches local state for changes.
|
||||
// 2. On local state change manager is notified and iterates through state
|
||||
// looking for proxy service registrations.
|
||||
// 3. For each proxy service registered, the manager maintains a State
|
||||
// instance, recreating on change, removing when deregistered.
|
||||
// 4. State instance copies the parts of the the proxy service registration
|
||||
// needed to configure proxy, and sets up blocking watches on the local
|
||||
// agent cache for all remote state needed: root and leaf certs, intentions,
|
||||
// and service discovery results for the specified upstreams. This ensures
|
||||
// these results are always in local cache for "pull" based proxies like the
|
||||
// built-in one.
|
||||
// 5. If needed, pull-based proxy config APIs like the xDS server can Watch the
|
||||
// config for a given proxy service.
|
||||
// 6. Watchers get notified every time something changes the current snapshot
|
||||
// of config for the proxy. That might be changes to the registration,
|
||||
// certificate rotations, changes to the upstreams required (needing
|
||||
// different listener config), or changes to the service discovery results
|
||||
// for any upstream (e.g. new instance of upstream service came up).
|
||||
package proxycfg
|
|
@ -0,0 +1,36 @@
|
|||
package proxycfg
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/mitchellh/copystructure"
|
||||
)
|
||||
|
||||
// ConfigSnapshot captures all the resulting config needed for a proxy instance.
|
||||
// It is meant to be point-in-time coherent and is used to deliver the current
|
||||
// config state to observers who need it to be pushed in (e.g. XDS server).
|
||||
type ConfigSnapshot struct {
|
||||
ProxyID string
|
||||
Address string
|
||||
Port int
|
||||
Proxy structs.ConnectProxyConfig
|
||||
Roots *structs.IndexedCARoots
|
||||
Leaf *structs.IssuedCert
|
||||
UpstreamEndpoints map[string]structs.CheckServiceNodes
|
||||
|
||||
// Skip intentions for now as we don't push those down yet, just pre-warm them.
|
||||
}
|
||||
|
||||
// Valid returns whether or not the snapshot has all required fields filled yet.
|
||||
func (s *ConfigSnapshot) Valid() bool {
|
||||
return s.Roots != nil && s.Leaf != nil
|
||||
}
|
||||
|
||||
// Clone makes a deep copy of the snapshot we can send to other goroutines
|
||||
// without worrying that they will racily read or mutate shared maps etc.
|
||||
func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
|
||||
snapCopy, err := copystructure.Copy(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return snapCopy.(*ConfigSnapshot), nil
|
||||
}
|
|
@ -0,0 +1,346 @@
|
|||
package proxycfg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/mitchellh/copystructure"
|
||||
)
|
||||
|
||||
const (
|
||||
coalesceTimeout = 200 * time.Millisecond
|
||||
rootsWatchID = "roots"
|
||||
leafWatchID = "leaf"
|
||||
intentionsWatchID = "intentions"
|
||||
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
|
||||
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
|
||||
)
|
||||
|
||||
// state holds all the state needed to maintain the config for a registered
|
||||
// connect-proxy service. When a proxy registration is changed, the entire state
|
||||
// is discarded and a new one created.
|
||||
type state struct {
|
||||
// logger, source and cache are required to be set before calling Watch.
|
||||
logger *log.Logger
|
||||
source *structs.QuerySource
|
||||
cache *cache.Cache
|
||||
|
||||
// ctx and cancel store the context created during initWatches call
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
proxyID string
|
||||
address string
|
||||
port int
|
||||
proxyCfg structs.ConnectProxyConfig
|
||||
token string
|
||||
|
||||
ch chan cache.UpdateEvent
|
||||
snapCh chan ConfigSnapshot
|
||||
reqCh chan chan *ConfigSnapshot
|
||||
}
|
||||
|
||||
// newState populates the state struct by copying relevant fields from the
|
||||
// NodeService and Token. We copy so that we can use them in a separate
|
||||
// goroutine later without reasoning about races with the NodeService passed
|
||||
// (especially for embedded fields like maps and slices).
|
||||
//
|
||||
// The returned state needs it's required dependencies to be set before Watch
|
||||
// can be called.
|
||||
func newState(ns *structs.NodeService, token string) (*state, error) {
|
||||
if ns.Kind != structs.ServiceKindConnectProxy {
|
||||
return nil, errors.New("not a connect-proxy")
|
||||
}
|
||||
|
||||
// Copy the config map
|
||||
proxyCfgRaw, err := copystructure.Copy(ns.Proxy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
proxyCfg, ok := proxyCfgRaw.(structs.ConnectProxyConfig)
|
||||
if !ok {
|
||||
return nil, errors.New("failed to copy proxy config")
|
||||
}
|
||||
|
||||
return &state{
|
||||
proxyID: ns.ID,
|
||||
address: ns.Address,
|
||||
port: ns.Port,
|
||||
proxyCfg: proxyCfg,
|
||||
token: token,
|
||||
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
|
||||
// reasonable number of upstream watches to all deliver their initial
|
||||
// messages in parallel without blocking the cache.Notify loops. It's not a
|
||||
// huge deal if we do for a short period so we don't need to be more
|
||||
// conservative to handle larger numbers of upstreams correctly but gives
|
||||
// some head room for normal operation to be non-blocking in most typical
|
||||
// cases.
|
||||
ch: make(chan cache.UpdateEvent, 10),
|
||||
snapCh: make(chan ConfigSnapshot, 1),
|
||||
reqCh: make(chan chan *ConfigSnapshot, 1),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Watch initialised watches on all necessary cache data for the current proxy
|
||||
// registration state and returns a chan to observe updates to the
|
||||
// ConfigSnapshot that contains all necessary config state. The chan is closed
|
||||
// when the state is Closed.
|
||||
func (s *state) Watch() (<-chan ConfigSnapshot, error) {
|
||||
s.ctx, s.cancel = context.WithCancel(context.Background())
|
||||
|
||||
err := s.initWatches()
|
||||
if err != nil {
|
||||
s.cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go s.run()
|
||||
|
||||
return s.snapCh, nil
|
||||
}
|
||||
|
||||
// Close discards the state and stops any long-running watches.
|
||||
func (s *state) Close() error {
|
||||
if s.cancel != nil {
|
||||
s.cancel()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// initWatches sets up the watches needed based on current proxy registration
|
||||
// state.
|
||||
func (s *state) initWatches() error {
|
||||
// Watch for root changes
|
||||
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
}, rootsWatchID, s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Watch the leaf cert
|
||||
err = s.cache.Notify(s.ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
Token: s.token,
|
||||
Service: s.proxyCfg.DestinationServiceName,
|
||||
}, leafWatchID, s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Watch for intention updates
|
||||
err = s.cache.Notify(s.ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Match: &structs.IntentionQueryMatch{
|
||||
Type: structs.IntentionMatchDestination,
|
||||
Entries: []structs.IntentionMatchEntry{
|
||||
{
|
||||
Namespace: structs.IntentionDefaultNamespace,
|
||||
Name: s.proxyCfg.DestinationServiceName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, intentionsWatchID, s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Watch for updates to service endpoints for all upstreams
|
||||
for _, u := range s.proxyCfg.Upstreams {
|
||||
dc := s.source.Datacenter
|
||||
if u.Datacenter != "" {
|
||||
dc = u.Datacenter
|
||||
}
|
||||
|
||||
switch u.DestinationType {
|
||||
case structs.UpstreamDestTypePreparedQuery:
|
||||
// TODO(banks): prepared queries don't support blocking. We need to come
|
||||
// up with an alternative to Notify that will poll at a sensible rate.
|
||||
|
||||
// err = c.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
|
||||
// Datacenter: dc,
|
||||
// QueryOptions: structs.QueryOptions{Token: token},
|
||||
// QueryIDOrName: u.DestinationName,
|
||||
// Connect: true,
|
||||
// }, u.Identifier(), ch)
|
||||
case structs.UpstreamDestTypeService:
|
||||
fallthrough
|
||||
case "": // Treat unset as the default Service type
|
||||
err = s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: u.DestinationName,
|
||||
Connect: true,
|
||||
}, u.Identifier(), s.ch)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown upstream type: %q", u.DestinationType)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *state) run() {
|
||||
// Close the channel we return from Watch when we stop so consumers can stop
|
||||
// watching and clean up their goroutines. It's important we do this here and
|
||||
// not in Close since this routine sends on this chan and so might panic if it
|
||||
// gets closed from another goroutine.
|
||||
defer close(s.snapCh)
|
||||
|
||||
snap := ConfigSnapshot{
|
||||
ProxyID: s.proxyID,
|
||||
Address: s.address,
|
||||
Port: s.port,
|
||||
Proxy: s.proxyCfg,
|
||||
UpstreamEndpoints: make(map[string]structs.CheckServiceNodes),
|
||||
}
|
||||
// This turns out to be really fiddly/painful by just using time.Timer.C
|
||||
// directly in the code below since you can't detect when a timer is stopped
|
||||
// vs waiting in order to know to reset it. So just use a chan to send
|
||||
// ourselves messages.
|
||||
sendCh := make(chan struct{})
|
||||
var coalesceTimer *time.Timer
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case u := <-s.ch:
|
||||
if err := s.handleUpdate(u, &snap); err != nil {
|
||||
s.logger.Printf("[ERR] %s watch error: %s", u.CorrelationID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
case <-sendCh:
|
||||
// Make a deep copy of snap so we don't mutate any of the embedded structs
|
||||
// etc on future updates.
|
||||
snapCopy, err := snap.Clone()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] Failed to copy config snapshot for proxy %s",
|
||||
s.proxyID)
|
||||
continue
|
||||
}
|
||||
s.snapCh <- *snapCopy
|
||||
// Allow the next change to trigger a send
|
||||
coalesceTimer = nil
|
||||
|
||||
// Skip rest of loop - there is nothing to send since nothing changed on
|
||||
// this iteration
|
||||
continue
|
||||
|
||||
case replyCh := <-s.reqCh:
|
||||
if !snap.Valid() {
|
||||
// Not valid yet just respond with nil and move on to next task.
|
||||
replyCh <- nil
|
||||
continue
|
||||
}
|
||||
// Make a deep copy of snap so we don't mutate any of the embedded structs
|
||||
// etc on future updates.
|
||||
snapCopy, err := snap.Clone()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] Failed to copy config snapshot for proxy %s",
|
||||
s.proxyID)
|
||||
continue
|
||||
}
|
||||
replyCh <- snapCopy
|
||||
|
||||
// Skip rest of loop - there is nothing to send since nothing changed on
|
||||
// this iteration
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if snap is complete enough to be a valid config to deliver to a
|
||||
// proxy yet.
|
||||
if snap.Valid() {
|
||||
// Don't send it right away, set a short timer that will wait for updates
|
||||
// from any of the other cache values and deliver them all together.
|
||||
if coalesceTimer == nil {
|
||||
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
|
||||
// This runs in another goroutine so we can't just do the send
|
||||
// directly here as access to snap is racy. Instead, signal the main
|
||||
// loop above.
|
||||
sendCh <- struct{}{}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
switch u.CorrelationID {
|
||||
case rootsWatchID:
|
||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for roots response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
case leafWatchID:
|
||||
leaf, ok := u.Result.(*structs.IssuedCert)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for leaf response: %T", u.Result)
|
||||
}
|
||||
snap.Leaf = leaf
|
||||
case intentionsWatchID:
|
||||
// Not in snapshot currently, no op
|
||||
default:
|
||||
// Service discovery result, figure out which type
|
||||
switch {
|
||||
case strings.HasPrefix(u.CorrelationID, serviceIDPrefix):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service response: %T", u.Result)
|
||||
}
|
||||
snap.UpstreamEndpoints[u.CorrelationID] = resp.Nodes
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, preparedQueryIDPrefix):
|
||||
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for prepared query response: %T", u.Result)
|
||||
}
|
||||
snap.UpstreamEndpoints[u.CorrelationID] = resp.Nodes
|
||||
|
||||
default:
|
||||
return errors.New("unknown correlation ID")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CurrentSnapshot synchronously returns the current ConfigSnapshot if there is
|
||||
// one ready. If we don't have one yet because not all necessary parts have been
|
||||
// returned (i.e. both roots and leaf cert), nil is returned.
|
||||
func (s *state) CurrentSnapshot() *ConfigSnapshot {
|
||||
// Make a chan for the response to be sent on
|
||||
ch := make(chan *ConfigSnapshot, 1)
|
||||
s.reqCh <- ch
|
||||
// Wait for the response
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// Changed returns whether or not the passed NodeService has had any of the
|
||||
// fields we care about for config state watching changed or a different token.
|
||||
func (s *state) Changed(ns *structs.NodeService, token string) bool {
|
||||
if ns == nil {
|
||||
return true
|
||||
}
|
||||
return ns.Kind != structs.ServiceKindConnectProxy ||
|
||||
s.proxyID != ns.ID ||
|
||||
s.address != ns.Address ||
|
||||
s.port != ns.Port ||
|
||||
!reflect.DeepEqual(s.proxyCfg, ns.Proxy) ||
|
||||
s.token != token
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
package proxycfg
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func TestStateChanged(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ns *structs.NodeService
|
||||
token string
|
||||
mutate func(ns structs.NodeService, token string) (*structs.NodeService, string)
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "nil node service",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
return nil, token
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "same service",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
return &ns, token
|
||||
}, want: false,
|
||||
},
|
||||
{
|
||||
name: "same service, different token",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
token: "foo",
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
return &ns, "bar"
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "different service ID",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
token: "foo",
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
ns.ID = "badger"
|
||||
return &ns, token
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "different address",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
token: "foo",
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
ns.Address = "10.10.10.10"
|
||||
return &ns, token
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "different port",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
token: "foo",
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
ns.Port = 12345
|
||||
return &ns, token
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "different service kind",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
token: "foo",
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
ns.Kind = ""
|
||||
return &ns, token
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "different proxy target",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
token: "foo",
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
ns.Proxy.DestinationServiceName = "badger"
|
||||
return &ns, token
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "different proxy upstreams",
|
||||
ns: structs.TestNodeServiceProxy(t),
|
||||
token: "foo",
|
||||
mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) {
|
||||
ns.Proxy.Upstreams = nil
|
||||
return &ns, token
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
state, err := newState(tt.ns, tt.token)
|
||||
require.NoError(err)
|
||||
otherNS, otherToken := tt.mutate(*tt.ns, tt.token)
|
||||
require.Equal(tt.want, state.Changed(otherNS, otherToken))
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,246 @@
|
|||
package proxycfg
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestCacheTypes encapsulates all the different cache types proxycfg.State will
|
||||
// watch/request for contolling one during testing.
|
||||
type TestCacheTypes struct {
|
||||
roots *ControllableCacheType
|
||||
leaf *ControllableCacheType
|
||||
intentions *ControllableCacheType
|
||||
health *ControllableCacheType
|
||||
query *ControllableCacheType
|
||||
}
|
||||
|
||||
// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that
|
||||
// proxycfg will watch suitable for testing a proxycfg.State or Manager.
|
||||
func NewTestCacheTypes(t testing.T) *TestCacheTypes {
|
||||
t.Helper()
|
||||
ct := &TestCacheTypes{
|
||||
roots: NewControllableCacheType(t),
|
||||
leaf: NewControllableCacheType(t),
|
||||
intentions: NewControllableCacheType(t),
|
||||
health: NewControllableCacheType(t),
|
||||
query: NewControllableCacheType(t),
|
||||
}
|
||||
ct.query.blocking = false
|
||||
return ct
|
||||
}
|
||||
|
||||
// TestCacheWithTypes registers ControllableCacheTypes for all types that
|
||||
// proxycfg will watch suitable for testing a proxycfg.State or Manager.
|
||||
func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
|
||||
c := cache.TestCache(t)
|
||||
c.RegisterType(cachetype.ConnectCARootName, types.roots, &cache.RegisterOptions{
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
c.RegisterType(cachetype.ConnectCALeafName, types.leaf, &cache.RegisterOptions{
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
c.RegisterType(cachetype.IntentionMatchName, types.intentions, &cache.RegisterOptions{
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
c.RegisterType(cachetype.HealthServicesName, types.health, &cache.RegisterOptions{
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{
|
||||
Refresh: false,
|
||||
})
|
||||
return c
|
||||
}
|
||||
|
||||
// TestCerts genereates a CA and Leaf suitable for returning as mock CA
|
||||
// root/leaf cache requests.
|
||||
func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) {
|
||||
t.Helper()
|
||||
|
||||
ca := connect.TestCA(t, nil)
|
||||
roots := &structs.IndexedCARoots{
|
||||
ActiveRootID: ca.ID,
|
||||
TrustDomain: connect.TestClusterID,
|
||||
Roots: []*structs.CARoot{ca},
|
||||
}
|
||||
return roots, TestLeafForCA(t, ca)
|
||||
}
|
||||
|
||||
// TestLeafForCA genereates new Leaf suitable for returning as mock CA
|
||||
// leaf cache resonse, signed by an existing CA.
|
||||
func TestLeafForCA(t testing.T, ca *structs.CARoot) *structs.IssuedCert {
|
||||
leafPEM, pkPEM := connect.TestLeaf(t, "web", ca)
|
||||
|
||||
leafCert, err := connect.ParseCert(leafPEM)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &structs.IssuedCert{
|
||||
SerialNumber: connect.HexString(leafCert.SerialNumber.Bytes()),
|
||||
CertPEM: leafPEM,
|
||||
PrivateKeyPEM: pkPEM,
|
||||
Service: "web",
|
||||
ServiceURI: leafCert.URIs[0].String(),
|
||||
ValidAfter: leafCert.NotBefore,
|
||||
ValidBefore: leafCert.NotAfter,
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntentions returns a sample intentions match result useful to
|
||||
// mocking service discovery cache results.
|
||||
func TestIntentions(t testing.T) *structs.IndexedIntentionMatches {
|
||||
return &structs.IndexedIntentionMatches{
|
||||
Matches: []structs.Intentions{
|
||||
[]*structs.Intention{
|
||||
&structs.Intention{
|
||||
ID: "foo",
|
||||
SourceNS: "default",
|
||||
SourceName: "billing",
|
||||
DestinationNS: "default",
|
||||
DestinationName: "web",
|
||||
Action: structs.IntentionActionAllow,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpstreamNodes returns a sample service discovery result useful to
|
||||
// mocking service discovery cache results.
|
||||
func TestUpstreamNodes(t testing.T) structs.CheckServiceNodes {
|
||||
return structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
ID: "test1",
|
||||
Node: "test1",
|
||||
Address: "10.10.1.1",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
Service: structs.TestNodeService(t),
|
||||
},
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
ID: "test2",
|
||||
Node: "test2",
|
||||
Address: "10.10.1.2",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
Service: structs.TestNodeService(t),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TestConfigSnapshot returns a fully populated snapshot
|
||||
func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
|
||||
roots, leaf := TestCerts(t)
|
||||
return &ConfigSnapshot{
|
||||
ProxyID: "web-sidecar-proxy",
|
||||
Address: "0.0.0.0",
|
||||
Port: 9999,
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceID: "web",
|
||||
DestinationServiceName: "web",
|
||||
LocalServiceAddress: "127.0.0.1",
|
||||
LocalServicePort: 8080,
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
Upstreams: structs.TestUpstreams(t),
|
||||
},
|
||||
Roots: roots,
|
||||
Leaf: leaf,
|
||||
UpstreamEndpoints: map[string]structs.CheckServiceNodes{
|
||||
"service:db": TestUpstreamNodes(t),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ControllableCacheType is a cache.Type that simulates a typical blocking RPC
|
||||
// but lets us controll the responses and when they are deliverd easily.
|
||||
type ControllableCacheType struct {
|
||||
index uint64
|
||||
value atomic.Value
|
||||
// Need a condvar to trigger all blocking requests (there might be multiple
|
||||
// for same type due to background refresh and timing issues) when values
|
||||
// change. Chans make it nondeterministic which one triggers or need extra
|
||||
// locking to coodrinate rplacing after close etc.
|
||||
triggerMu sync.Mutex
|
||||
trigger *sync.Cond
|
||||
blocking bool
|
||||
lastReq atomic.Value
|
||||
}
|
||||
|
||||
// NewControllableCacheType returns a cache.Type that can be controlled for
|
||||
// testing.
|
||||
func NewControllableCacheType(t testing.T) *ControllableCacheType {
|
||||
c := &ControllableCacheType{
|
||||
index: 5,
|
||||
blocking: true,
|
||||
}
|
||||
c.trigger = sync.NewCond(&c.triggerMu)
|
||||
return c
|
||||
}
|
||||
|
||||
// Set sets the response value to be returned from subsequent cache gets for the
|
||||
// type.
|
||||
func (ct *ControllableCacheType) Set(value interface{}) {
|
||||
atomic.AddUint64(&ct.index, 1)
|
||||
ct.value.Store(value)
|
||||
ct.triggerMu.Lock()
|
||||
ct.trigger.Broadcast()
|
||||
ct.triggerMu.Unlock()
|
||||
}
|
||||
|
||||
// Fetch implements cache.Type. It simulates blocking or non-blocking queries.
|
||||
func (ct *ControllableCacheType) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||
|
||||
index := atomic.LoadUint64(&ct.index)
|
||||
|
||||
ct.lastReq.Store(req)
|
||||
|
||||
shouldBlock := ct.blocking && opts.MinIndex > 0 && opts.MinIndex == index
|
||||
if shouldBlock {
|
||||
// Wait for return to be triggered. We ignore timeouts based on opts.Timeout
|
||||
// since in practice they will always be way longer than our tests run for
|
||||
// and the caller can simulate timeout by triggering return without changing
|
||||
// index or value.
|
||||
ct.triggerMu.Lock()
|
||||
ct.trigger.Wait()
|
||||
ct.triggerMu.Unlock()
|
||||
}
|
||||
|
||||
// reload index as it probably got bumped
|
||||
index = atomic.LoadUint64(&ct.index)
|
||||
val := ct.value.Load()
|
||||
|
||||
if err, ok := val.(error); ok {
|
||||
return cache.FetchResult{
|
||||
Value: nil,
|
||||
Index: index,
|
||||
}, err
|
||||
}
|
||||
return cache.FetchResult{
|
||||
Value: val,
|
||||
Index: index,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SupportsBlocking implements cache.Type
|
||||
func (ct *ControllableCacheType) SupportsBlocking() bool {
|
||||
return ct.blocking
|
||||
}
|
|
@ -116,6 +116,8 @@ type ConnectManagedProxy struct {
|
|||
type ConnectManagedProxyConfig struct {
|
||||
BindAddress string `mapstructure:"bind_address"`
|
||||
BindPort int `mapstructure:"bind_port"`
|
||||
LocalServiceAddress string `mapstructure:"local_service_address"`
|
||||
LocalServicePort int `mapstructure:"local_service_port"`
|
||||
}
|
||||
|
||||
// ParseConfig attempts to read the fields we care about from the otherwise
|
||||
|
|
|
@ -157,6 +157,28 @@ func (u *Upstream) ToAPI() api.Upstream {
|
|||
}
|
||||
}
|
||||
|
||||
// Identifier returns a string representation that uniquely identifies the
|
||||
// upstream in a canonical but human readable way.
|
||||
func (u *Upstream) Identifier() string {
|
||||
name := u.DestinationName
|
||||
if u.DestinationNamespace != "" && u.DestinationNamespace != "default" {
|
||||
name = u.DestinationNamespace + "/" + u.DestinationName
|
||||
}
|
||||
if u.Datacenter != "" {
|
||||
name += "?dc=" + u.Datacenter
|
||||
}
|
||||
typ := u.DestinationType
|
||||
if typ == "" {
|
||||
typ = UpstreamDestTypeService
|
||||
}
|
||||
return typ + ":" + name
|
||||
}
|
||||
|
||||
// String implements Stringer by returning the Identifier.
|
||||
func (u *Upstream) String() string {
|
||||
return u.Identifier()
|
||||
}
|
||||
|
||||
// UpstreamFromAPI is a helper for converting api.Upstream to Upstream.
|
||||
func UpstreamFromAPI(u api.Upstream) Upstream {
|
||||
return Upstream{
|
||||
|
|
Loading…
Reference in New Issue