From 0f27ffd163155293e881c290ed35693a7223495f Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Wed, 3 Oct 2018 13:36:38 +0100 Subject: [PATCH] Proxy Config Manager (#4729) * Proxy Config Manager This component watches for local state changes on the agent and ensures that each service registered locally with Kind == connect-proxy has it's state being actively populated in the cache. This serves two purposes: 1. For the built-in proxy, it ensures that the state needed to accept connections is available in RAM shortly after registration and likely before the proxy actually starts accepting traffic. 2. For (future - next PR) xDS server and other possible future proxies that require _push_ based config discovery, this provides a mechanism to subscribe and be notified about updates to a proxy instance's config including upstream service discovery results. * Address review comments * Better comments; Better delivery of latest snapshot for slow watchers; Embed Config * Comment typos * Add upstream Stringer for funsies --- agent/local/state.go | 56 +++++ agent/local/state_test.go | 61 +++++ agent/proxycfg/manager.go | 350 ++++++++++++++++++++++++++ agent/proxycfg/manager_test.go | 288 +++++++++++++++++++++ agent/proxycfg/proxycfg.go | 53 ++++ agent/proxycfg/snapshot.go | 36 +++ agent/proxycfg/state.go | 346 +++++++++++++++++++++++++ agent/proxycfg/state_test.go | 114 +++++++++ agent/proxycfg/testing.go | 246 ++++++++++++++++++ agent/structs/connect.go | 6 +- agent/structs/connect_proxy_config.go | 22 ++ 11 files changed, 1576 insertions(+), 2 deletions(-) create mode 100644 agent/proxycfg/manager.go create mode 100644 agent/proxycfg/manager_test.go create mode 100644 agent/proxycfg/proxycfg.go create mode 100644 agent/proxycfg/snapshot.go create mode 100644 agent/proxycfg/state.go create mode 100644 agent/proxycfg/state_test.go create mode 100644 agent/proxycfg/testing.go diff --git a/agent/local/state.go b/agent/local/state.go index fcfb6a49dd..ebca0c8fc8 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -190,6 +190,17 @@ type State struct { // tokens contains the ACL tokens tokens *token.Store + // notifyHandlers is a map of registered channel listeners that are sent + // messages whenever state changes occur. For now these events only include + // service registration and deregistration since that is all that is needed + // but the same mechanism could be used for other state changes. + // + // Note that we haven't refactored managedProxyHandlers into this mechanism + // yet because that is soon to be deprecated and removed so it's easier to + // just leave them separate until managed proxies are removed entirely. Any + // future notifications should re-use this mechanism though. + notifyHandlers map[chan<- struct{}]struct{} + // managedProxies is a map of all managed connect proxies registered locally on // this agent. This is NOT kept in sync with servers since it's agent-local // config only. Proxy instances have separate service registrations in the @@ -215,6 +226,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { checkAliases: make(map[string]map[types.CheckID]chan<- struct{}), metadata: make(map[string]string), tokens: tokens, + notifyHandlers: make(map[chan<- struct{}]struct{}), managedProxies: make(map[string]*ManagedProxy), managedProxyHandlers: make(map[chan<- struct{}]struct{}), } @@ -290,6 +302,7 @@ func (l *State) RemoveService(id string) error { s.WatchCh = nil } l.TriggerSyncChanges() + l.broadcastUpdateLocked() return nil } @@ -355,6 +368,7 @@ func (l *State) SetServiceState(s *ServiceState) { } l.TriggerSyncChanges() + l.broadcastUpdateLocked() } // ServiceStates returns a shallow copy of all service state records. @@ -683,11 +697,18 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token, Service: target.Service + "-proxy", Proxy: structs.ConnectProxyConfig{ DestinationServiceName: target.Service, + LocalServiceAddress: cfg.LocalServiceAddress, + LocalServicePort: cfg.LocalServicePort, }, Address: cfg.BindAddress, Port: cfg.BindPort, } + // Set default port now while the target is known + if svc.Proxy.LocalServicePort < 1 { + svc.Proxy.LocalServicePort = target.Port + } + // Lock now. We can't lock earlier as l.Service would deadlock and shouldn't // anyway to minimise the critical section. l.Lock() @@ -821,6 +842,41 @@ func (l *State) Proxies() map[string]*ManagedProxy { return m } +// broadcastUpdateLocked assumes l is locked and delivers an update to all +// registered watchers. +func (l *State) broadcastUpdateLocked() { + for ch := range l.notifyHandlers { + // Do not block + select { + case ch <- struct{}{}: + default: + } + } +} + +// Notify will register a channel to receive messages when the local state +// changes. Only service add/remove are supported for now. See notes on +// l.notifyHandlers for more details. +// +// This will not block on channel send so ensure the channel has a buffer. Note +// that any buffer size is generally fine since actual data is not sent over the +// channel, so a dropped send due to a full buffer does not result in any loss +// of data. The fact that a buffer already contains a notification means that +// the receiver will still be notified that changes occurred. +func (l *State) Notify(ch chan<- struct{}) { + l.Lock() + defer l.Unlock() + l.notifyHandlers[ch] = struct{}{} +} + +// StopNotify will deregister a channel receiving state change notifications. +// Pair this with all calls to Notify to clean up state. +func (l *State) StopNotify(ch chan<- struct{}) { + l.Lock() + defer l.Unlock() + delete(l.notifyHandlers, ch) +} + // NotifyProxy will register a channel to receive messages when the // configuration or set of proxies changes. This will not block on // channel send so ensure the channel has a buffer. Note that any buffer diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 4eb16e9537..70c87811f5 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1901,6 +1901,67 @@ func checksInSync(state *local.State, wantChecks int) error { return nil } +func TestState_Notify(t *testing.T) { + t.Parallel() + + state := local.NewState(local.Config{}, + log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + + // Stub state syncing + state.TriggerSyncChanges = func() {} + + require := require.New(t) + assert := assert.New(t) + + // Register a notifier + notifyCh := make(chan struct{}, 1) + state.Notify(notifyCh) + defer state.StopNotify(notifyCh) + assert.Empty(notifyCh) + drainCh(notifyCh) + + // Add a service + err := state.AddService(&structs.NodeService{ + Service: "web", + }, "fake-token-web") + require.NoError(err) + + // Should have a notification + assert.NotEmpty(notifyCh) + drainCh(notifyCh) + + // Re-Add same service + err = state.AddService(&structs.NodeService{ + Service: "web", + Port: 4444, + }, "fake-token-web") + require.NoError(err) + + // Should have a notification + assert.NotEmpty(notifyCh) + drainCh(notifyCh) + + // Remove service + require.NoError(state.RemoveService("web")) + + // Should have a notification + assert.NotEmpty(notifyCh) + drainCh(notifyCh) + + // Stopping should... stop + state.StopNotify(notifyCh) + + // Add a service + err = state.AddService(&structs.NodeService{ + Service: "web", + }, "fake-token-web") + require.NoError(err) + + // Should NOT have a notification + assert.Empty(notifyCh) + drainCh(notifyCh) +} + func TestStateProxyManagement(t *testing.T) { t.Parallel() diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go new file mode 100644 index 0000000000..df434f7a5f --- /dev/null +++ b/agent/proxycfg/manager.go @@ -0,0 +1,350 @@ +package proxycfg + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/structs" +) + +var ( + // ErrStopped is returned from Run if the manager instance has already been + // stopped. + ErrStopped = errors.New("manager stopped") + + // ErrStarted is returned from Run if the manager instance has already run. + ErrStarted = errors.New("manager was already run") +) + +// CancelFunc is a type for a returned function that can be called to cancel a +// watch. +type CancelFunc func() + +// Manager is a component that integrates into the agent and manages Connect +// proxy configuration state. This should not be confused with the deprecated +// "managed proxy" concept where the agent supervises the actual proxy process. +// proxycfg.Manager is oblivious to the distinction and manages state for any +// service registered with Kind == connect-proxy. +// +// The Manager ensures that any Connect proxy registered on the agent has all +// the state it needs cached locally via the agent cache. State includes +// certificates, intentions, and service discovery results for any declared +// upstreams. See package docs for more detail. +type Manager struct { + ManagerConfig + + // stateCh is notified for any service changes in local state. We only use + // this to trigger on _new_ service addition since it has no data and we don't + // want to maintain a full copy of the state in order to diff and figure out + // what changed. Luckily each service has it's own WatchCh so we can figure + // out changes and removals with those efficiently. + stateCh chan struct{} + + mu sync.Mutex + started bool + proxies map[string]*state + watchers map[string]map[uint64]chan *ConfigSnapshot +} + +// ManagerConfig holds the required external dependencies for a Manager +// instance. All fields must be set to something valid or the manager will +// panic. The ManagerConfig is passed by value to NewManager so the passed value +// can be mutated safely. +type ManagerConfig struct { + // Cache is the agent's cache instance that can be used to retrieve, store and + // monitor state for the proxies. + Cache *cache.Cache + // state is the agent's local state to be watched for new proxy registrations. + State *local.State + // source describes the current agent's identity, it's used directly for + // prepared query discovery but also indirectly as a way to pass current + // Datacenter name into other request types that need it. This is sufficient + // for now and cleaner than passing the entire RuntimeConfig. + Source *structs.QuerySource + // logger is the agent's logger to be used for logging logs. + Logger *log.Logger +} + +// NewManager constructs a manager from the provided agent cache. +func NewManager(cfg ManagerConfig) (*Manager, error) { + if cfg.Cache == nil || cfg.State == nil || cfg.Source == nil || + cfg.Logger == nil { + return nil, errors.New("all ManagerConfig fields must be provided") + } + m := &Manager{ + ManagerConfig: cfg, + // Single item buffer is enough since there is no data transferred so this + // is "level triggering" and we can't miss actual data. + stateCh: make(chan struct{}, 1), + proxies: make(map[string]*state), + watchers: make(map[string]map[uint64]chan *ConfigSnapshot), + } + return m, nil +} + +// Run is the long-running method that handles state syncing. It should be run +// in it's own goroutine and will continue until a fatal error is hit or Close +// is called. Run will return an error if it is called more than once, or called +// after Close. +func (m *Manager) Run() error { + m.mu.Lock() + alreadyStarted := m.started + m.started = true + stateCh := m.stateCh + m.mu.Unlock() + + // Protect against multiple Run calls. + if alreadyStarted { + return ErrStarted + } + + // Protect against being run after Close. + if stateCh == nil { + return ErrStopped + } + + // Register for notifications about state changes + m.State.Notify(stateCh) + defer m.State.StopNotify(stateCh) + + for { + m.mu.Lock() + + // Traverse the local state and ensure all proxy services are registered + services := m.State.Services() + for svcID, svc := range services { + if svc.Kind != structs.ServiceKindConnectProxy { + continue + } + // TODO(banks): need to work out when to default some stuff. For example + // Proxy.LocalServicePort is practically necessary for any sidecar and can + // default to the port of the sidecar service, but only if it's already + // registered and once we get past here, we don't have enough context to + // know that so we'd need to set it here if not during registration of the + // proxy service. Sidecar Service and managed proxies in the interim can + // do that, but we should validate more generally that that is always + // true. + err := m.ensureProxyServiceLocked(svc, m.State.ServiceToken(svcID)) + if err != nil { + m.Logger.Printf("[ERR] failed to watch proxy service %s: %s", svc.ID, + err) + } + } + + // Now see if any proxies were removed + for proxyID := range m.proxies { + if _, ok := services[proxyID]; !ok { + // Remove them + m.removeProxyServiceLocked(proxyID) + } + } + + m.mu.Unlock() + + // Wait for a state change + _, ok := <-stateCh + if !ok { + // Stopped + return nil + } + } +} + +// ensureProxyServiceLocked adds or changes the proxy to our state. +func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string) error { + state, ok := m.proxies[ns.ID] + + if ok { + if !state.Changed(ns, token) { + // No change + return nil + } + + // We are updating the proxy, close it's old state + state.Close() + } + + var err error + state, err = newState(ns, token) + if err != nil { + return err + } + + // Set the necessary dependencies + state.logger = m.Logger + state.cache = m.Cache + state.source = m.Source + + ch, err := state.Watch() + if err != nil { + return err + } + m.proxies[ns.ID] = state + + // Start a goroutine that will wait for changes and broadcast them to watchers. + go func(ch <-chan ConfigSnapshot) { + // Run until ch is closed + for snap := range ch { + m.notify(&snap) + } + }(ch) + + return nil +} + +// removeProxyService is called when a service deregisters and frees all +// resources for that service. +func (m *Manager) removeProxyServiceLocked(proxyID string) { + state, ok := m.proxies[proxyID] + if !ok { + return + } + + // Closing state will let the goroutine we started in Ensure finish since + // watch chan is closed. + state.Close() + delete(m.proxies, proxyID) + + // We intentionally leave potential watchers hanging here - there is no new + // config for them and closing their channels might be indistinguishable from + // an error that they should retry. We rely for them to eventually give up + // (because they are in fact not running any more) and so the watches be + // cleaned up naturally. +} + +func (m *Manager) notify(snap *ConfigSnapshot) { + m.mu.Lock() + defer m.mu.Unlock() + + watchers, ok := m.watchers[snap.ProxyID] + if !ok { + return + } + + for _, ch := range watchers { + // Attempt delivery but don't let slow consumers block us forever. They + // might miss updates but it's better than breaking everything. + // + // TODO(banks): should we close their chan here to force them to eventually + // notice they are too slow? Not sure if it really helps. + select { + case ch <- snap: + case <-time.After(100 * time.Millisecond): + } + } +} + +// deliverLatest delivers the snapshot to a watch chan. If the delivery blocks, +// it will drain the chan and then re-attempt delivery so that a slow consumer +// gets the latest config earlier. This MUST be called from a method where m.mu +// is held to be safe since it assumes we are the only goroutine sending on ch. +func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan *ConfigSnapshot) { + // Send if chan is empty + select { + case ch <- snap: + return + default: + } + + // Not empty, drain the chan of older snapshots and redeliver. For now we only + // use 1-buffered chans but this will still work if we change that later. +OUTER: + for { + select { + case <-ch: + continue + default: + break OUTER + } + } + + // Now send again + select { + case ch <- snap: + return + default: + // This should not be possible since we should be the only sender, enforced + // by m.mu but error and drop the update rather than panic. + m.Logger.Printf("[ERR] proxycfg: failed to deliver ConfigSnapshot to %q", + snap.ProxyID) + } +} + +// Watch registers a watch on a proxy. It might not exist yet in which case this +// will not fail, but no updates will be delivered until the proxy is +// registered. If there is already a valid snapshot in memory, it will be +// delivered immediately. +func (m *Manager) Watch(proxyID string) (<-chan *ConfigSnapshot, CancelFunc) { + m.mu.Lock() + defer m.mu.Unlock() + + // This buffering is crucial otherwise we'd block immediately trying to + // deliver the current snapshot below if we already have one. + ch := make(chan *ConfigSnapshot, 1) + watchers, ok := m.watchers[proxyID] + if !ok { + watchers = make(map[uint64]chan *ConfigSnapshot) + } + idx := uint64(len(watchers)) + watchers[idx] = ch + m.watchers[proxyID] = watchers + + // Deliver the current snapshot immediately if there is one ready + if state, ok := m.proxies[proxyID]; ok { + if snap := state.CurrentSnapshot(); snap != nil { + // We rely on ch being buffered above and that it's not been passed + // anywhere so we must be the only writer so this will never block and + // deadlock. + ch <- snap + } + } + + return ch, func() { + m.mu.Lock() + defer m.mu.Unlock() + m.closeWatchLocked(proxyID, idx) + } +} + +// closeWatchLocked cleans up state related to a single watcher. It assumes the +// lock is held. +func (m *Manager) closeWatchLocked(proxyID string, watchIdx uint64) { + if watchers, ok := m.watchers[proxyID]; ok { + if ch, ok := watchers[watchIdx]; ok { + delete(watchers, watchIdx) + close(ch) + if len(watchers) == 0 { + delete(m.watchers, proxyID) + } + } + } +} + +// Close removes all state and stops all running goroutines. +func (m *Manager) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.stateCh != nil { + close(m.stateCh) + m.stateCh = nil + } + + // Close all current watchers first + for proxyID, watchers := range m.watchers { + for idx := range watchers { + m.closeWatchLocked(proxyID, idx) + } + } + + // Then close all states + for proxyID, state := range m.proxies { + state.Close() + delete(m.proxies, proxyID) + } + return nil +} diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go new file mode 100644 index 0000000000..6183766b0c --- /dev/null +++ b/agent/proxycfg/manager_test.go @@ -0,0 +1,288 @@ +package proxycfg + +import ( + "log" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/token" +) + +// assertLastReqArgs verifies that each request type had the correct source +// parameters (e.g. Datacenter name) and token. +func assertLastReqArgs(t *testing.T, types *TestCacheTypes, token string, source *structs.QuerySource) { + t.Helper() + // Roots needs correct DC and token + rootReq := types.roots.lastReq.Load() + require.IsType(t, rootReq, &structs.DCSpecificRequest{}) + require.Equal(t, token, rootReq.(*structs.DCSpecificRequest).Token) + require.Equal(t, source.Datacenter, rootReq.(*structs.DCSpecificRequest).Datacenter) + + // Leaf needs correct DC and token + leafReq := types.leaf.lastReq.Load() + require.IsType(t, leafReq, &cachetype.ConnectCALeafRequest{}) + require.Equal(t, token, leafReq.(*cachetype.ConnectCALeafRequest).Token) + require.Equal(t, source.Datacenter, leafReq.(*cachetype.ConnectCALeafRequest).Datacenter) + + // Intentions needs correct DC and token + intReq := types.intentions.lastReq.Load() + require.IsType(t, intReq, &structs.IntentionQueryRequest{}) + require.Equal(t, token, intReq.(*structs.IntentionQueryRequest).Token) + require.Equal(t, source.Datacenter, intReq.(*structs.IntentionQueryRequest).Datacenter) +} + +func TestManager_BasicLifecycle(t *testing.T) { + // Use a mocked cache to make life simpler + types := NewTestCacheTypes(t) + c := TestCacheWithTypes(t, types) + + require := require.New(t) + + roots, leaf := TestCerts(t) + + // Setup initial values + types.roots.value.Store(roots) + types.leaf.value.Store(leaf) + types.intentions.value.Store(TestIntentions(t)) + types.health.value.Store( + &structs.IndexedCheckServiceNodes{ + Nodes: TestUpstreamNodes(t), + }) + + logger := log.New(os.Stderr, "", log.LstdFlags) + state := local.NewState(local.Config{}, logger, &token.Store{}) + source := &structs.QuerySource{ + Node: "node1", + Datacenter: "dc1", + } + + // Stub state syncing + state.TriggerSyncChanges = func() {} + + // Create manager + m, err := NewManager(ManagerConfig{c, state, source, logger}) + require.NoError(err) + + // And run it + go func() { + err := m.Run() + require.NoError(err) + }() + + // Register a proxy for "web" + webProxy := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 9999, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceID: "web", + DestinationServiceName: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8080, + Config: map[string]interface{}{ + "foo": "bar", + }, + Upstreams: structs.TestUpstreams(t), + }, + } + + // BEFORE we register, we should be able to get a watch channel + wCh, cancel := m.Watch(webProxy.ID) + defer cancel() + + // And it should block with nothing sent on it yet + assertWatchChanBlocks(t, wCh) + + require.NoError(state.AddService(webProxy, "my-token")) + + // We should see the initial config delivered but not until after the + // coalesce timeout + expectSnap := &ConfigSnapshot{ + ProxyID: webProxy.ID, + Address: webProxy.Address, + Port: webProxy.Port, + Proxy: webProxy.Proxy, + Roots: roots, + Leaf: leaf, + UpstreamEndpoints: map[string]structs.CheckServiceNodes{ + "service:db": TestUpstreamNodes(t), + }, + } + start := time.Now() + assertWatchChanRecvs(t, wCh, expectSnap) + require.True(time.Since(start) >= coalesceTimeout) + + assertLastReqArgs(t, types, "my-token", source) + + // Update NodeConfig + webProxy.Port = 7777 + require.NoError(state.AddService(webProxy, "my-token")) + + expectSnap.Port = 7777 + assertWatchChanRecvs(t, wCh, expectSnap) + + // Register a second watcher + wCh2, cancel2 := m.Watch(webProxy.ID) + defer cancel2() + + // New watcher should immediately receive the current state + assertWatchChanRecvs(t, wCh2, expectSnap) + + // Change token + require.NoError(state.AddService(webProxy, "other-token")) + assertWatchChanRecvs(t, wCh, expectSnap) + assertWatchChanRecvs(t, wCh2, expectSnap) + + // This is actually sort of timing dependent - the cache background fetcher + // will still be fetching with the old token, but we rely on the fact that our + // mock type will have been blocked on those for a while. + assertLastReqArgs(t, types, "other-token", source) + // Update roots + newRoots, newLeaf := TestCerts(t) + newRoots.Roots = append(newRoots.Roots, roots.Roots...) + types.roots.Set(newRoots) + + // Expect new roots in snapshot + expectSnap.Roots = newRoots + assertWatchChanRecvs(t, wCh, expectSnap) + assertWatchChanRecvs(t, wCh2, expectSnap) + + // Update leaf + types.leaf.Set(newLeaf) + + // Expect new roots in snapshot + expectSnap.Leaf = newLeaf + assertWatchChanRecvs(t, wCh, expectSnap) + assertWatchChanRecvs(t, wCh2, expectSnap) + + // Remove the proxy + state.RemoveService(webProxy.ID) + + // Chan should NOT close + assertWatchChanBlocks(t, wCh) + assertWatchChanBlocks(t, wCh2) + + // Re-add the proxy with another new port + webProxy.Port = 3333 + require.NoError(state.AddService(webProxy, "other-token")) + + // Same watch chan should be notified again + expectSnap.Port = 3333 + assertWatchChanRecvs(t, wCh, expectSnap) + assertWatchChanRecvs(t, wCh2, expectSnap) + + // Cancel watch + cancel() + + // Watch chan should be closed + assertWatchChanRecvs(t, wCh, nil) + + // We specifically don't remove the proxy or cancel the second watcher to + // ensure both are cleaned up by close. + require.NoError(m.Close()) + + // Sanity check the state is clean + m.mu.Lock() + defer m.mu.Unlock() + require.Len(m.proxies, 0) + require.Len(m.watchers, 0) +} + +func assertWatchChanBlocks(t *testing.T, ch <-chan *ConfigSnapshot) { + t.Helper() + + select { + case <-ch: + t.Fatal("Should be nothing sent on watch chan yet") + default: + } +} + +func assertWatchChanRecvs(t *testing.T, ch <-chan *ConfigSnapshot, expect *ConfigSnapshot) { + t.Helper() + + select { + case got, ok := <-ch: + require.Equal(t, expect, got) + if expect == nil { + require.False(t, ok, "watch chan should be closed") + } + case <-time.After(50*time.Millisecond + coalesceTimeout): + t.Fatal("recv timeout") + } +} + +func TestManager_deliverLatest(t *testing.T) { + // None of these need to do anything to test this method just be valid + logger := log.New(os.Stderr, "", log.LstdFlags) + cfg := ManagerConfig{ + Cache: cache.New(nil), + State: local.NewState(local.Config{}, logger, &token.Store{}), + Source: &structs.QuerySource{ + Node: "node1", + Datacenter: "dc1", + }, + Logger: logger, + } + require := require.New(t) + + m, err := NewManager(cfg) + require.NoError(err) + + snap1 := &ConfigSnapshot{ + ProxyID: "test-proxy", + Port: 1111, + } + snap2 := &ConfigSnapshot{ + ProxyID: "test-proxy", + Port: 2222, + } + + // Put an overall time limit on this test case so we don't have to guard every + // call to ensure the whole test doesn't deadlock. + time.AfterFunc(100*time.Millisecond, func() { + t.Fatal("test timed out") + }) + + // test 1 buffered chan + ch1 := make(chan *ConfigSnapshot, 1) + + // Sending to an unblocked chan should work + m.deliverLatest(snap1, ch1) + + // Check it was delivered + require.Equal(snap1, <-ch1) + + // Now send both without reading simulating a slow client + m.deliverLatest(snap1, ch1) + m.deliverLatest(snap2, ch1) + + // Check we got the _second_ one + require.Equal(snap2, <-ch1) + + // Same again for 5-buffered chan + ch5 := make(chan *ConfigSnapshot, 5) + + // Sending to an unblocked chan should work + m.deliverLatest(snap1, ch5) + + // Check it was delivered + require.Equal(snap1, <-ch5) + + // Now send enough to fill the chan simulating a slow client + for i := 0; i < 5; i++ { + m.deliverLatest(snap1, ch5) + } + m.deliverLatest(snap2, ch5) + + // Check we got the _second_ one + require.Equal(snap2, <-ch5) +} diff --git a/agent/proxycfg/proxycfg.go b/agent/proxycfg/proxycfg.go new file mode 100644 index 0000000000..f4a2f2499b --- /dev/null +++ b/agent/proxycfg/proxycfg.go @@ -0,0 +1,53 @@ +// Package proxycfg provides a component that monitors local agent state for +// Connect proxy service registrations and maintains the necessary cache state +// for those proxies locally. Local cache state keeps pull based proxies (e.g. +// the built in one) performant even on first request/startup, and allows for +// push-based proxy APIs (e.g. xDS for Envoy) to be notified of updates to the +// proxy configuration. +// +// The relationship with other agent components looks like this: +// +// +------------------------------------------+ +// | AGENT | +// | | +// | +--------+ 1. +----------+ | +// | | local |<-----+ proxycfg |<--------+ | +// | | state +----->| Manager |<---+ | | +// | +--------+ 2. +^---+-----+ | | | +// | 5.| | | | | +// | +----------+ | +-------+--+ |4. | +// | | +->| proxycfg | | | +// | | 3.| | State | | | +// | | | +----------+ | | +// | | | | | +// | | | +----------+ | | +// | | +->| proxycfg +-+ | +// | | | State | | +// | | +----------+ | +// | |6. | +// | +----v---+ | +// | | xDS | | +// | | Server | | +// | +--------+ | +// | | +// +------------------------------------------+ +// +// 1. Manager watches local state for changes. +// 2. On local state change manager is notified and iterates through state +// looking for proxy service registrations. +// 3. For each proxy service registered, the manager maintains a State +// instance, recreating on change, removing when deregistered. +// 4. State instance copies the parts of the the proxy service registration +// needed to configure proxy, and sets up blocking watches on the local +// agent cache for all remote state needed: root and leaf certs, intentions, +// and service discovery results for the specified upstreams. This ensures +// these results are always in local cache for "pull" based proxies like the +// built-in one. +// 5. If needed, pull-based proxy config APIs like the xDS server can Watch the +// config for a given proxy service. +// 6. Watchers get notified every time something changes the current snapshot +// of config for the proxy. That might be changes to the registration, +// certificate rotations, changes to the upstreams required (needing +// different listener config), or changes to the service discovery results +// for any upstream (e.g. new instance of upstream service came up). +package proxycfg diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go new file mode 100644 index 0000000000..c16b100551 --- /dev/null +++ b/agent/proxycfg/snapshot.go @@ -0,0 +1,36 @@ +package proxycfg + +import ( + "github.com/hashicorp/consul/agent/structs" + "github.com/mitchellh/copystructure" +) + +// ConfigSnapshot captures all the resulting config needed for a proxy instance. +// It is meant to be point-in-time coherent and is used to deliver the current +// config state to observers who need it to be pushed in (e.g. XDS server). +type ConfigSnapshot struct { + ProxyID string + Address string + Port int + Proxy structs.ConnectProxyConfig + Roots *structs.IndexedCARoots + Leaf *structs.IssuedCert + UpstreamEndpoints map[string]structs.CheckServiceNodes + + // Skip intentions for now as we don't push those down yet, just pre-warm them. +} + +// Valid returns whether or not the snapshot has all required fields filled yet. +func (s *ConfigSnapshot) Valid() bool { + return s.Roots != nil && s.Leaf != nil +} + +// Clone makes a deep copy of the snapshot we can send to other goroutines +// without worrying that they will racily read or mutate shared maps etc. +func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) { + snapCopy, err := copystructure.Copy(s) + if err != nil { + return nil, err + } + return snapCopy.(*ConfigSnapshot), nil +} diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go new file mode 100644 index 0000000000..956b4f035d --- /dev/null +++ b/agent/proxycfg/state.go @@ -0,0 +1,346 @@ +package proxycfg + +import ( + "context" + "errors" + "fmt" + "log" + "reflect" + "strings" + "time" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/structs" + "github.com/mitchellh/copystructure" +) + +const ( + coalesceTimeout = 200 * time.Millisecond + rootsWatchID = "roots" + leafWatchID = "leaf" + intentionsWatchID = "intentions" + serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":" + preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":" +) + +// state holds all the state needed to maintain the config for a registered +// connect-proxy service. When a proxy registration is changed, the entire state +// is discarded and a new one created. +type state struct { + // logger, source and cache are required to be set before calling Watch. + logger *log.Logger + source *structs.QuerySource + cache *cache.Cache + + // ctx and cancel store the context created during initWatches call + ctx context.Context + cancel func() + + proxyID string + address string + port int + proxyCfg structs.ConnectProxyConfig + token string + + ch chan cache.UpdateEvent + snapCh chan ConfigSnapshot + reqCh chan chan *ConfigSnapshot +} + +// newState populates the state struct by copying relevant fields from the +// NodeService and Token. We copy so that we can use them in a separate +// goroutine later without reasoning about races with the NodeService passed +// (especially for embedded fields like maps and slices). +// +// The returned state needs it's required dependencies to be set before Watch +// can be called. +func newState(ns *structs.NodeService, token string) (*state, error) { + if ns.Kind != structs.ServiceKindConnectProxy { + return nil, errors.New("not a connect-proxy") + } + + // Copy the config map + proxyCfgRaw, err := copystructure.Copy(ns.Proxy) + if err != nil { + return nil, err + } + proxyCfg, ok := proxyCfgRaw.(structs.ConnectProxyConfig) + if !ok { + return nil, errors.New("failed to copy proxy config") + } + + return &state{ + proxyID: ns.ID, + address: ns.Address, + port: ns.Port, + proxyCfg: proxyCfg, + token: token, + // 10 is fairly arbitrary here but allow for the 3 mandatory and a + // reasonable number of upstream watches to all deliver their initial + // messages in parallel without blocking the cache.Notify loops. It's not a + // huge deal if we do for a short period so we don't need to be more + // conservative to handle larger numbers of upstreams correctly but gives + // some head room for normal operation to be non-blocking in most typical + // cases. + ch: make(chan cache.UpdateEvent, 10), + snapCh: make(chan ConfigSnapshot, 1), + reqCh: make(chan chan *ConfigSnapshot, 1), + }, nil +} + +// Watch initialised watches on all necessary cache data for the current proxy +// registration state and returns a chan to observe updates to the +// ConfigSnapshot that contains all necessary config state. The chan is closed +// when the state is Closed. +func (s *state) Watch() (<-chan ConfigSnapshot, error) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + + err := s.initWatches() + if err != nil { + s.cancel() + return nil, err + } + + go s.run() + + return s.snapCh, nil +} + +// Close discards the state and stops any long-running watches. +func (s *state) Close() error { + if s.cancel != nil { + s.cancel() + } + return nil +} + +// initWatches sets up the watches needed based on current proxy registration +// state. +func (s *state) initWatches() error { + // Watch for root changes + err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ + Datacenter: s.source.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.token}, + }, rootsWatchID, s.ch) + if err != nil { + return err + } + + // Watch the leaf cert + err = s.cache.Notify(s.ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{ + Datacenter: s.source.Datacenter, + Token: s.token, + Service: s.proxyCfg.DestinationServiceName, + }, leafWatchID, s.ch) + if err != nil { + return err + } + + // Watch for intention updates + err = s.cache.Notify(s.ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{ + Datacenter: s.source.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.token}, + Match: &structs.IntentionQueryMatch{ + Type: structs.IntentionMatchDestination, + Entries: []structs.IntentionMatchEntry{ + { + Namespace: structs.IntentionDefaultNamespace, + Name: s.proxyCfg.DestinationServiceName, + }, + }, + }, + }, intentionsWatchID, s.ch) + if err != nil { + return err + } + + // Watch for updates to service endpoints for all upstreams + for _, u := range s.proxyCfg.Upstreams { + dc := s.source.Datacenter + if u.Datacenter != "" { + dc = u.Datacenter + } + + switch u.DestinationType { + case structs.UpstreamDestTypePreparedQuery: + // TODO(banks): prepared queries don't support blocking. We need to come + // up with an alternative to Notify that will poll at a sensible rate. + + // err = c.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{ + // Datacenter: dc, + // QueryOptions: structs.QueryOptions{Token: token}, + // QueryIDOrName: u.DestinationName, + // Connect: true, + // }, u.Identifier(), ch) + case structs.UpstreamDestTypeService: + fallthrough + case "": // Treat unset as the default Service type + err = s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ + Datacenter: dc, + QueryOptions: structs.QueryOptions{Token: s.token}, + ServiceName: u.DestinationName, + Connect: true, + }, u.Identifier(), s.ch) + + if err != nil { + return err + } + + default: + return fmt.Errorf("unknown upstream type: %q", u.DestinationType) + } + } + return nil +} + +func (s *state) run() { + // Close the channel we return from Watch when we stop so consumers can stop + // watching and clean up their goroutines. It's important we do this here and + // not in Close since this routine sends on this chan and so might panic if it + // gets closed from another goroutine. + defer close(s.snapCh) + + snap := ConfigSnapshot{ + ProxyID: s.proxyID, + Address: s.address, + Port: s.port, + Proxy: s.proxyCfg, + UpstreamEndpoints: make(map[string]structs.CheckServiceNodes), + } + // This turns out to be really fiddly/painful by just using time.Timer.C + // directly in the code below since you can't detect when a timer is stopped + // vs waiting in order to know to reset it. So just use a chan to send + // ourselves messages. + sendCh := make(chan struct{}) + var coalesceTimer *time.Timer + + for { + select { + case <-s.ctx.Done(): + return + case u := <-s.ch: + if err := s.handleUpdate(u, &snap); err != nil { + s.logger.Printf("[ERR] %s watch error: %s", u.CorrelationID, err) + continue + } + + case <-sendCh: + // Make a deep copy of snap so we don't mutate any of the embedded structs + // etc on future updates. + snapCopy, err := snap.Clone() + if err != nil { + s.logger.Printf("[ERR] Failed to copy config snapshot for proxy %s", + s.proxyID) + continue + } + s.snapCh <- *snapCopy + // Allow the next change to trigger a send + coalesceTimer = nil + + // Skip rest of loop - there is nothing to send since nothing changed on + // this iteration + continue + + case replyCh := <-s.reqCh: + if !snap.Valid() { + // Not valid yet just respond with nil and move on to next task. + replyCh <- nil + continue + } + // Make a deep copy of snap so we don't mutate any of the embedded structs + // etc on future updates. + snapCopy, err := snap.Clone() + if err != nil { + s.logger.Printf("[ERR] Failed to copy config snapshot for proxy %s", + s.proxyID) + continue + } + replyCh <- snapCopy + + // Skip rest of loop - there is nothing to send since nothing changed on + // this iteration + continue + } + + // Check if snap is complete enough to be a valid config to deliver to a + // proxy yet. + if snap.Valid() { + // Don't send it right away, set a short timer that will wait for updates + // from any of the other cache values and deliver them all together. + if coalesceTimer == nil { + coalesceTimer = time.AfterFunc(coalesceTimeout, func() { + // This runs in another goroutine so we can't just do the send + // directly here as access to snap is racy. Instead, signal the main + // loop above. + sendCh <- struct{}{} + }) + } + } + } +} + +func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error { + switch u.CorrelationID { + case rootsWatchID: + roots, ok := u.Result.(*structs.IndexedCARoots) + if !ok { + return fmt.Errorf("invalid type for roots response: %T", u.Result) + } + snap.Roots = roots + case leafWatchID: + leaf, ok := u.Result.(*structs.IssuedCert) + if !ok { + return fmt.Errorf("invalid type for leaf response: %T", u.Result) + } + snap.Leaf = leaf + case intentionsWatchID: + // Not in snapshot currently, no op + default: + // Service discovery result, figure out which type + switch { + case strings.HasPrefix(u.CorrelationID, serviceIDPrefix): + resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) + if !ok { + return fmt.Errorf("invalid type for service response: %T", u.Result) + } + snap.UpstreamEndpoints[u.CorrelationID] = resp.Nodes + + case strings.HasPrefix(u.CorrelationID, preparedQueryIDPrefix): + resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse) + if !ok { + return fmt.Errorf("invalid type for prepared query response: %T", u.Result) + } + snap.UpstreamEndpoints[u.CorrelationID] = resp.Nodes + + default: + return errors.New("unknown correlation ID") + } + } + return nil +} + +// CurrentSnapshot synchronously returns the current ConfigSnapshot if there is +// one ready. If we don't have one yet because not all necessary parts have been +// returned (i.e. both roots and leaf cert), nil is returned. +func (s *state) CurrentSnapshot() *ConfigSnapshot { + // Make a chan for the response to be sent on + ch := make(chan *ConfigSnapshot, 1) + s.reqCh <- ch + // Wait for the response + return <-ch +} + +// Changed returns whether or not the passed NodeService has had any of the +// fields we care about for config state watching changed or a different token. +func (s *state) Changed(ns *structs.NodeService, token string) bool { + if ns == nil { + return true + } + return ns.Kind != structs.ServiceKindConnectProxy || + s.proxyID != ns.ID || + s.address != ns.Address || + s.port != ns.Port || + !reflect.DeepEqual(s.proxyCfg, ns.Proxy) || + s.token != token +} diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go new file mode 100644 index 0000000000..cfdccfd6d5 --- /dev/null +++ b/agent/proxycfg/state_test.go @@ -0,0 +1,114 @@ +package proxycfg + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" +) + +func TestStateChanged(t *testing.T) { + tests := []struct { + name string + ns *structs.NodeService + token string + mutate func(ns structs.NodeService, token string) (*structs.NodeService, string) + want bool + }{ + { + name: "nil node service", + ns: structs.TestNodeServiceProxy(t), + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + return nil, token + }, + want: true, + }, + { + name: "same service", + ns: structs.TestNodeServiceProxy(t), + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + return &ns, token + }, want: false, + }, + { + name: "same service, different token", + ns: structs.TestNodeServiceProxy(t), + token: "foo", + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + return &ns, "bar" + }, + want: true, + }, + { + name: "different service ID", + ns: structs.TestNodeServiceProxy(t), + token: "foo", + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + ns.ID = "badger" + return &ns, token + }, + want: true, + }, + { + name: "different address", + ns: structs.TestNodeServiceProxy(t), + token: "foo", + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + ns.Address = "10.10.10.10" + return &ns, token + }, + want: true, + }, + { + name: "different port", + ns: structs.TestNodeServiceProxy(t), + token: "foo", + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + ns.Port = 12345 + return &ns, token + }, + want: true, + }, + { + name: "different service kind", + ns: structs.TestNodeServiceProxy(t), + token: "foo", + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + ns.Kind = "" + return &ns, token + }, + want: true, + }, + { + name: "different proxy target", + ns: structs.TestNodeServiceProxy(t), + token: "foo", + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + ns.Proxy.DestinationServiceName = "badger" + return &ns, token + }, + want: true, + }, + { + name: "different proxy upstreams", + ns: structs.TestNodeServiceProxy(t), + token: "foo", + mutate: func(ns structs.NodeService, token string) (*structs.NodeService, string) { + ns.Proxy.Upstreams = nil + return &ns, token + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + state, err := newState(tt.ns, tt.token) + require.NoError(err) + otherNS, otherToken := tt.mutate(*tt.ns, tt.token) + require.Equal(tt.want, state.Changed(otherNS, otherToken)) + }) + } +} diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go new file mode 100644 index 0000000000..638bad21ab --- /dev/null +++ b/agent/proxycfg/testing.go @@ -0,0 +1,246 @@ +package proxycfg + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/structs" + "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/require" +) + +// TestCacheTypes encapsulates all the different cache types proxycfg.State will +// watch/request for contolling one during testing. +type TestCacheTypes struct { + roots *ControllableCacheType + leaf *ControllableCacheType + intentions *ControllableCacheType + health *ControllableCacheType + query *ControllableCacheType +} + +// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that +// proxycfg will watch suitable for testing a proxycfg.State or Manager. +func NewTestCacheTypes(t testing.T) *TestCacheTypes { + t.Helper() + ct := &TestCacheTypes{ + roots: NewControllableCacheType(t), + leaf: NewControllableCacheType(t), + intentions: NewControllableCacheType(t), + health: NewControllableCacheType(t), + query: NewControllableCacheType(t), + } + ct.query.blocking = false + return ct +} + +// TestCacheWithTypes registers ControllableCacheTypes for all types that +// proxycfg will watch suitable for testing a proxycfg.State or Manager. +func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache { + c := cache.TestCache(t) + c.RegisterType(cachetype.ConnectCARootName, types.roots, &cache.RegisterOptions{ + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 10 * time.Minute, + }) + c.RegisterType(cachetype.ConnectCALeafName, types.leaf, &cache.RegisterOptions{ + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 10 * time.Minute, + }) + c.RegisterType(cachetype.IntentionMatchName, types.intentions, &cache.RegisterOptions{ + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 10 * time.Minute, + }) + c.RegisterType(cachetype.HealthServicesName, types.health, &cache.RegisterOptions{ + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 10 * time.Minute, + }) + c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{ + Refresh: false, + }) + return c +} + +// TestCerts genereates a CA and Leaf suitable for returning as mock CA +// root/leaf cache requests. +func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) { + t.Helper() + + ca := connect.TestCA(t, nil) + roots := &structs.IndexedCARoots{ + ActiveRootID: ca.ID, + TrustDomain: connect.TestClusterID, + Roots: []*structs.CARoot{ca}, + } + return roots, TestLeafForCA(t, ca) +} + +// TestLeafForCA genereates new Leaf suitable for returning as mock CA +// leaf cache resonse, signed by an existing CA. +func TestLeafForCA(t testing.T, ca *structs.CARoot) *structs.IssuedCert { + leafPEM, pkPEM := connect.TestLeaf(t, "web", ca) + + leafCert, err := connect.ParseCert(leafPEM) + require.NoError(t, err) + + return &structs.IssuedCert{ + SerialNumber: connect.HexString(leafCert.SerialNumber.Bytes()), + CertPEM: leafPEM, + PrivateKeyPEM: pkPEM, + Service: "web", + ServiceURI: leafCert.URIs[0].String(), + ValidAfter: leafCert.NotBefore, + ValidBefore: leafCert.NotAfter, + } +} + +// TestIntentions returns a sample intentions match result useful to +// mocking service discovery cache results. +func TestIntentions(t testing.T) *structs.IndexedIntentionMatches { + return &structs.IndexedIntentionMatches{ + Matches: []structs.Intentions{ + []*structs.Intention{ + &structs.Intention{ + ID: "foo", + SourceNS: "default", + SourceName: "billing", + DestinationNS: "default", + DestinationName: "web", + Action: structs.IntentionActionAllow, + }, + }, + }, + } +} + +// TestUpstreamNodes returns a sample service discovery result useful to +// mocking service discovery cache results. +func TestUpstreamNodes(t testing.T) structs.CheckServiceNodes { + return structs.CheckServiceNodes{ + structs.CheckServiceNode{ + Node: &structs.Node{ + ID: "test1", + Node: "test1", + Address: "10.10.1.1", + Datacenter: "dc1", + }, + Service: structs.TestNodeService(t), + }, + structs.CheckServiceNode{ + Node: &structs.Node{ + ID: "test2", + Node: "test2", + Address: "10.10.1.2", + Datacenter: "dc1", + }, + Service: structs.TestNodeService(t), + }, + } +} + +// TestConfigSnapshot returns a fully populated snapshot +func TestConfigSnapshot(t testing.T) *ConfigSnapshot { + roots, leaf := TestCerts(t) + return &ConfigSnapshot{ + ProxyID: "web-sidecar-proxy", + Address: "0.0.0.0", + Port: 9999, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceID: "web", + DestinationServiceName: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8080, + Config: map[string]interface{}{ + "foo": "bar", + }, + Upstreams: structs.TestUpstreams(t), + }, + Roots: roots, + Leaf: leaf, + UpstreamEndpoints: map[string]structs.CheckServiceNodes{ + "service:db": TestUpstreamNodes(t), + }, + } +} + +// ControllableCacheType is a cache.Type that simulates a typical blocking RPC +// but lets us controll the responses and when they are deliverd easily. +type ControllableCacheType struct { + index uint64 + value atomic.Value + // Need a condvar to trigger all blocking requests (there might be multiple + // for same type due to background refresh and timing issues) when values + // change. Chans make it nondeterministic which one triggers or need extra + // locking to coodrinate rplacing after close etc. + triggerMu sync.Mutex + trigger *sync.Cond + blocking bool + lastReq atomic.Value +} + +// NewControllableCacheType returns a cache.Type that can be controlled for +// testing. +func NewControllableCacheType(t testing.T) *ControllableCacheType { + c := &ControllableCacheType{ + index: 5, + blocking: true, + } + c.trigger = sync.NewCond(&c.triggerMu) + return c +} + +// Set sets the response value to be returned from subsequent cache gets for the +// type. +func (ct *ControllableCacheType) Set(value interface{}) { + atomic.AddUint64(&ct.index, 1) + ct.value.Store(value) + ct.triggerMu.Lock() + ct.trigger.Broadcast() + ct.triggerMu.Unlock() +} + +// Fetch implements cache.Type. It simulates blocking or non-blocking queries. +func (ct *ControllableCacheType) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + + index := atomic.LoadUint64(&ct.index) + + ct.lastReq.Store(req) + + shouldBlock := ct.blocking && opts.MinIndex > 0 && opts.MinIndex == index + if shouldBlock { + // Wait for return to be triggered. We ignore timeouts based on opts.Timeout + // since in practice they will always be way longer than our tests run for + // and the caller can simulate timeout by triggering return without changing + // index or value. + ct.triggerMu.Lock() + ct.trigger.Wait() + ct.triggerMu.Unlock() + } + + // reload index as it probably got bumped + index = atomic.LoadUint64(&ct.index) + val := ct.value.Load() + + if err, ok := val.(error); ok { + return cache.FetchResult{ + Value: nil, + Index: index, + }, err + } + return cache.FetchResult{ + Value: val, + Index: index, + }, nil +} + +// SupportsBlocking implements cache.Type +func (ct *ControllableCacheType) SupportsBlocking() bool { + return ct.blocking +} diff --git a/agent/structs/connect.go b/agent/structs/connect.go index 0a21b520c4..dc0010478b 100644 --- a/agent/structs/connect.go +++ b/agent/structs/connect.go @@ -114,8 +114,10 @@ type ConnectManagedProxy struct { // ConnectManagedProxy.Config that we care about. They are all optional anyway // and this is used to decode them with mapstructure. type ConnectManagedProxyConfig struct { - BindAddress string `mapstructure:"bind_address"` - BindPort int `mapstructure:"bind_port"` + BindAddress string `mapstructure:"bind_address"` + BindPort int `mapstructure:"bind_port"` + LocalServiceAddress string `mapstructure:"local_service_address"` + LocalServicePort int `mapstructure:"local_service_port"` } // ParseConfig attempts to read the fields we care about from the otherwise diff --git a/agent/structs/connect_proxy_config.go b/agent/structs/connect_proxy_config.go index 80cada307c..9f5f2edddd 100644 --- a/agent/structs/connect_proxy_config.go +++ b/agent/structs/connect_proxy_config.go @@ -157,6 +157,28 @@ func (u *Upstream) ToAPI() api.Upstream { } } +// Identifier returns a string representation that uniquely identifies the +// upstream in a canonical but human readable way. +func (u *Upstream) Identifier() string { + name := u.DestinationName + if u.DestinationNamespace != "" && u.DestinationNamespace != "default" { + name = u.DestinationNamespace + "/" + u.DestinationName + } + if u.Datacenter != "" { + name += "?dc=" + u.Datacenter + } + typ := u.DestinationType + if typ == "" { + typ = UpstreamDestTypeService + } + return typ + ":" + name +} + +// String implements Stringer by returning the Identifier. +func (u *Upstream) String() string { + return u.Identifier() +} + // UpstreamFromAPI is a helper for converting api.Upstream to Upstream. func UpstreamFromAPI(u api.Upstream) Upstream { return Upstream{