diff --git a/agent/ae/ae.go b/agent/ae/ae.go index b150001b6e..fb028ebfe2 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -306,8 +306,9 @@ func (s *StateSyncer) Paused() bool { return s.paused != 0 } -// Resume re-enables sync runs. -func (s *StateSyncer) Resume() { +// Resume re-enables sync runs. It returns true if it was the last pause/resume +// pair on the stack and so actually caused the state syncer to resume. +func (s *StateSyncer) Resume() bool { s.pauseLock.Lock() s.paused-- if s.paused < 0 { @@ -318,4 +319,5 @@ func (s *StateSyncer) Resume() { if trigger { s.SyncChanges.Trigger() } + return trigger } diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go index a80532d80e..960f4004ba 100644 --- a/agent/ae/ae_test.go +++ b/agent/ae/ae_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/hashicorp/consul/lib" + "github.com/stretchr/testify/assert" ) func TestAE_scaleFactor(t *testing.T) { @@ -47,14 +48,16 @@ func TestAE_Pause_nestedPauseResume(t *testing.T) { if l.Paused() != true { t.Fatal("syncer should STILL be Paused after second call to Pause()") } - l.Resume() + gotR := l.Resume() if l.Paused() != true { t.Fatal("syncer should STILL be Paused after FIRST call to Resume()") } - l.Resume() + assert.False(t, gotR) + gotR = l.Resume() if l.Paused() != false { t.Fatal("syncer should NOT be Paused after SECOND call to Resume()") } + assert.True(t, gotR) defer func() { err := recover() diff --git a/agent/agent.go b/agent/agent.go index b6f3d6a9ae..23e7b90d6e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -127,6 +127,11 @@ type Agent struct { // and the remote state. sync *ae.StateSyncer + // syncMu and syncCh are used to coordinate agent endpoints that are blocking + // on local state during a config reload. + syncMu sync.Mutex + syncCh chan struct{} + // cache is the in-memory cache for data the Agent requests. cache *cache.Cache @@ -1490,14 +1495,53 @@ func (a *Agent) StartSync() { a.logger.Printf("[INFO] agent: started state syncer") } -// PauseSync is used to pause anti-entropy while bulk changes are make +// PauseSync is used to pause anti-entropy while bulk changes are made. It also +// sets state that agent-local watches use to "ride out" config reloads and bulk +// updates which might spuriously unload state and reload it again. func (a *Agent) PauseSync() { + // Do this outside of lock as it has it's own locking a.sync.Pause() + + // Coordinate local state watchers + a.syncMu.Lock() + defer a.syncMu.Unlock() + if a.syncCh == nil { + a.syncCh = make(chan struct{}) + } } // ResumeSync is used to unpause anti-entropy after bulk changes are make func (a *Agent) ResumeSync() { - a.sync.Resume() + // a.sync maintains a stack/ref count of Pause calls since we call + // Pause/Resume in nested way during a reload and AddService. We only want to + // trigger local state watchers if this Resume call actually started sync back + // up again (i.e. was the last resume on the stack). We could check that + // separately with a.sync.Paused but that is racey since another Pause call + // might be made between our Resume and checking Paused. + resumed := a.sync.Resume() + + if !resumed { + // Return early so we don't notify local watchers until we are actually + // resumed. + return + } + + // Coordinate local state watchers + a.syncMu.Lock() + defer a.syncMu.Unlock() + + if a.syncCh != nil { + close(a.syncCh) + a.syncCh = nil + } +} + +// syncPausedCh returns either a channel or nil. If nil sync is not paused. If +// non-nil, the channel will be closed when sync resumes. +func (a *Agent) syncPausedCh() <-chan struct{} { + a.syncMu.Lock() + defer a.syncMu.Unlock() + return a.syncCh } // GetLANCoordinate returns the coordinates of this node in the local pools diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index e3038d57f9..d35c069cc1 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1,8 +1,10 @@ package agent import ( + "errors" "fmt" "log" + "net" "net/http" "net/url" "strconv" @@ -230,6 +232,154 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) return agentSvcs, nil } +// GET /v1/agent/service/:service_id +// +// Returns the service definition for a single local services and allows +// blocking watch using hash-based blocking. +func (s *HTTPServer) AgentService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Get the proxy ID. Note that this is the ID of a proxy's service instance. + id := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/") + + // DEPRECATED(managed-proxies) - remove this whole hack. + // + // Support managed proxies until they are removed entirely. Since built-in + // proxy will now use this endpoint, in order to not break managed proxies in + // the interim until they are removed, we need to mirror the default-setting + // behaviour they had. Rather than thread that through this whole method as + // special cases that need to be unwound later (and duplicate logic in the + // proxy config endpoint) just defer to that and then translater the response. + if managedProxy := s.agent.State.Proxy(id); managedProxy != nil { + // This is for a managed proxy, use the old endpoint's behaviour + req.URL.Path = "/v1/agent/connect/proxy/" + id + obj, err := s.AgentConnectProxyConfig(resp, req) + if err != nil { + return obj, err + } + proxyCfg, ok := obj.(*api.ConnectProxyConfig) + if !ok { + return nil, errors.New("internal error") + } + // These are all set by defaults so type checks are just sanity checks that + // should never fail. + port, ok := proxyCfg.Config["bind_port"].(int) + if !ok || port < 1 { + return nil, errors.New("invalid proxy config") + } + addr, ok := proxyCfg.Config["bind_address"].(string) + if !ok || addr == "" { + return nil, errors.New("invalid proxy config") + } + localAddr, ok := proxyCfg.Config["local_service_address"].(string) + if !ok || localAddr == "" { + return nil, errors.New("invalid proxy config") + } + // Old local_service_address was a host:port + localAddress, localPortRaw, err := net.SplitHostPort(localAddr) + if err != nil { + return nil, err + } + localPort, err := strconv.Atoi(localPortRaw) + if err != nil { + return nil, err + } + + reply := &api.AgentService{ + Kind: api.ServiceKindConnectProxy, + ID: proxyCfg.ProxyServiceID, + Service: managedProxy.Proxy.ProxyService.Service, + Port: port, + Address: addr, + ContentHash: proxyCfg.ContentHash, + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: proxyCfg.TargetServiceName, + DestinationServiceID: proxyCfg.TargetServiceID, + LocalServiceAddress: localAddress, + LocalServicePort: localPort, + Config: proxyCfg.Config, + Upstreams: proxyCfg.Upstreams, + }, + } + return reply, nil + } + + // Maybe block + var queryOpts structs.QueryOptions + if parseWait(resp, req, &queryOpts) { + // parseWait returns an error itself + return nil, nil + } + + // Parse the token + var token string + s.parseToken(req, &token) + + // Parse hash specially. Eventually this should happen in parseWait and end up + // in QueryOptions but I didn't want to make very general changes right away. + hash := req.URL.Query().Get("hash") + + return s.agentLocalBlockingQuery(resp, hash, &queryOpts, + func(ws memdb.WatchSet) (string, interface{}, error) { + + svcState := s.agent.State.ServiceState(id) + if svcState == nil { + resp.WriteHeader(http.StatusNotFound) + fmt.Fprintf(resp, "unknown proxy service ID: %s", id) + return "", nil, nil + } + + svc := svcState.Service + + // Setup watch on the service + ws.Add(svcState.WatchCh) + + // Check ACLs. + rule, err := s.agent.resolveToken(token) + if err != nil { + return "", nil, err + } + if rule != nil && !rule.ServiceRead(svc.Service) { + return "", nil, acl.ErrPermissionDenied + } + + var connect *api.AgentServiceConnect + var proxy *api.AgentServiceConnectProxyConfig + + if svc.Connect.Native { + connect = &api.AgentServiceConnect{ + Native: svc.Connect.Native, + } + } + + if svc.Kind == structs.ServiceKindConnectProxy { + proxy = svc.Proxy.ToAPI() + } + + // Calculate the content hash over the response, minus the hash field + reply := &api.AgentService{ + Kind: api.ServiceKind(svc.Kind), + ID: svc.ID, + Service: svc.Service, + Tags: svc.Tags, + Meta: svc.Meta, + Port: svc.Port, + Address: svc.Address, + EnableTagOverride: svc.EnableTagOverride, + Proxy: proxy, + Connect: connect, + } + + rawHash, err := hashstructure.Hash(reply, nil) + if err != nil { + return "", nil, err + } + + // Include the ContentHash in the response body + reply.ContentHash = fmt.Sprintf("%x", rawHash) + + return reply.ContentHash, reply, nil + }) +} + func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any. var token string @@ -1254,7 +1404,17 @@ func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash stri return curResp, err } // Watch returned false indicating a change was detected, loop and repeat - // the callback to load the new value. + // the callback to load the new value. If agent sync is paused it means + // local state is currently being bulk-edited e.g. config reload. In this + // case it's likely that local state just got unloaded and may or may not be + // reloaded yet. Wait a short amount of time for Sync to resume to ride out + // typical config reloads. + if syncPauseCh := s.agent.syncPausedCh(); syncPauseCh != nil { + select { + case <-syncPauseCh: + case <-timeout.C: + } + } } } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index c77090d1b3..1a0b6115f5 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -225,6 +225,367 @@ func TestAgent_Services_ACLFilter(t *testing.T) { }) } +func TestAgent_Service(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t.Name(), TestACLConfig()+` + services { + name = "web" + port = 8181 + } + `) + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + + proxy := structs.TestConnectProxyConfig(t) + proxy.DestinationServiceID = "web1" + + // Define a valid local sidecar proxy service + sidecarProxy := &structs.ServiceDefinition{ + Kind: structs.ServiceKindConnectProxy, + Name: "web-sidecar-proxy", + Check: structs.CheckType{ + TCP: "127.0.0.1:8000", + Interval: 10 * time.Second, + }, + Port: 8000, + Proxy: &proxy, + } + + // Define an updated version. Be careful to copy it. + updatedProxy := *sidecarProxy + updatedProxy.Port = 9999 + + // Mangle the proxy config/upstreams into the expected for with defaults and + // API struct types. + expectProxy := proxy + expectProxy.Upstreams = + structs.TestAddDefaultsToUpstreams(t, sidecarProxy.Proxy.Upstreams) + + expectedResponse := &api.AgentService{ + Kind: api.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 8000, + Proxy: expectProxy.ToAPI(), + ContentHash: "26959a754e182054", + } + + // Copy and modify + updatedResponse := *expectedResponse + updatedResponse.Port = 9999 + updatedResponse.ContentHash = "1bdcf042660b33f6" + + // Simple response for non-proxy service regustered in TestAgent config + expectWebResponse := &api.AgentService{ + ID: "web", + Service: "web", + Port: 8181, + ContentHash: "7be2b0411161d3b1", + } + + tests := []struct { + name string + tokenRules string + url string + updateFunc func() + wantWait time.Duration + wantCode int + wantErr string + wantResp *api.AgentService + }{ + { + name: "simple fetch - proxy", + url: "/v1/agent/service/web-sidecar-proxy", + wantCode: 200, + wantResp: expectedResponse, + }, + { + name: "simple fetch - non-proxy", + url: "/v1/agent/service/web", + wantCode: 200, + wantResp: expectWebResponse, + }, + { + name: "blocking fetch timeout, no change", + url: "/v1/agent/service/web-sidecar-proxy?hash=" + expectedResponse.ContentHash + "&wait=100ms", + wantWait: 100 * time.Millisecond, + wantCode: 200, + wantResp: expectedResponse, + }, + { + name: "blocking fetch old hash should return immediately", + url: "/v1/agent/service/web-sidecar-proxy?hash=123456789abcd&wait=10m", + wantCode: 200, + wantResp: expectedResponse, + }, + { + name: "blocking fetch returns change", + url: "/v1/agent/service/web-sidecar-proxy?hash=" + expectedResponse.ContentHash, + updateFunc: func() { + time.Sleep(100 * time.Millisecond) + // Re-register with new proxy config, make sure we copy the struct so we + // don't alter it and affect later test cases. + req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=root", jsonReader(updatedProxy)) + resp := httptest.NewRecorder() + _, err := a.srv.AgentRegisterService(resp, req) + require.NoError(t, err) + require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String()) + }, + wantWait: 100 * time.Millisecond, + wantCode: 200, + wantResp: &updatedResponse, + }, + { + // This test exercises a case that caused a busy loop to eat CPU for the + // entire duration of the blocking query. If a service gets re-registered + // wth same proxy config then the old proxy config chan is closed causing + // blocked watchset.Watch to return false indicating a change. But since + // the hash is the same when the blocking fn is re-called we should just + // keep blocking on the next iteration. The bug hit was that the WatchSet + // ws was not being reset in the loop and so when you try to `Watch` it + // the second time it just returns immediately making the blocking loop + // into a busy-poll! + // + // This test though doesn't catch that because busy poll still has the + // correct external behaviour. I don't want to instrument the loop to + // assert it's not executing too fast here as I can't think of a clean way + // and the issue is fixed now so this test doesn't actually catch the + // error, but does provide an easy way to verify the behaviour by hand: + // 1. Make this test fail e.g. change wantErr to true + // 2. Add a log.Println or similar into the blocking loop/function + // 3. See whether it's called just once or many times in a tight loop. + name: "blocking fetch interrupted with no change (same hash)", + url: "/v1/agent/service/web-sidecar-proxy?wait=200ms&hash=" + expectedResponse.ContentHash, + updateFunc: func() { + time.Sleep(100 * time.Millisecond) + // Re-register with _same_ proxy config + req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=root", jsonReader(sidecarProxy)) + resp := httptest.NewRecorder() + _, err := a.srv.AgentRegisterService(resp, req) + require.NoError(t, err) + require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String()) + }, + wantWait: 200 * time.Millisecond, + wantCode: 200, + wantResp: expectedResponse, + }, + { + // When we reload config, the agent pauses Anti-entropy, then clears all + // services (which causes their watch chans to be closed) before loading + // state from config/snapshot again). If we do that naively then we don't + // just get a spurios wakeup on the watch if the service didn't change, + // but we get it wakeup and then race with the reload and probably see no + // services and return a 404 error which is gross. This test excercises + // that - even though the registrations were from API not config, they are + // persisted and cleared/reloaded from snapshot which has same effect. + // + // The fix for this test is to allow the same mechanism that pauses + // Anti-entropy during reload to also pause the hash blocking loop so we + // don't resume until the state is reloaded and we get a chance to see if + // it actually changed or not. + name: "blocking fetch interrupted by reload shouldn't 404 - no change", + url: "/v1/agent/service/web-sidecar-proxy?wait=200ms&hash=" + expectedResponse.ContentHash, + updateFunc: func() { + time.Sleep(100 * time.Millisecond) + // Reload + require.NoError(t, a.ReloadConfig(a.Config)) + }, + // Should eventually timeout since there is no actual change + wantWait: 200 * time.Millisecond, + wantCode: 200, + wantResp: expectedResponse, + }, + { + // As above but test actually altering the service with the config reload. + // This simulates the API registration being overridden by a different one + // on disk during reload. + name: "blocking fetch interrupted by reload shouldn't 404 - changes", + url: "/v1/agent/service/web-sidecar-proxy?wait=10m&hash=" + expectedResponse.ContentHash, + updateFunc: func() { + time.Sleep(100 * time.Millisecond) + // Reload + newConfig := *a.Config + newConfig.Services = append(newConfig.Services, &updatedProxy) + require.NoError(t, a.ReloadConfig(&newConfig)) + }, + wantWait: 100 * time.Millisecond, + wantCode: 200, + wantResp: &updatedResponse, + }, + { + name: "err: non-existent proxy", + url: "/v1/agent/service/nope", + wantCode: 404, + }, + { + name: "err: bad ACL for service", + url: "/v1/agent/service/web-sidecar-proxy", + // Limited token doesn't grant read to the service + tokenRules: ` + key "" { + policy = "read" + } + `, + // Note that because we return ErrPermissionDenied and handle writing + // status at a higher level helper this actually gets a 200 in this test + // case so just assert that it was an error. + wantErr: "Permission denied", + }, + { + name: "good ACL for service", + url: "/v1/agent/service/web-sidecar-proxy", + // Limited token doesn't grant read to the service + tokenRules: ` + service "web-sidecar-proxy" { + policy = "read" + } + `, + wantCode: 200, + wantResp: expectedResponse, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + // Register the basic service to ensure it's in a known state to start. + { + req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=root", jsonReader(sidecarProxy)) + resp := httptest.NewRecorder() + _, err := a.srv.AgentRegisterService(resp, req) + require.NoError(err) + require.Equal(200, resp.Code, "body: %s", resp.Body.String()) + } + + req, _ := http.NewRequest("GET", tt.url, nil) + + // Inject the root token for tests that don't care about ACL + var token = "root" + if tt.tokenRules != "" { + // Create new token and use that. + token = testCreateToken(t, a, tt.tokenRules) + } + req.Header.Set("X-Consul-Token", token) + resp := httptest.NewRecorder() + if tt.updateFunc != nil { + go tt.updateFunc() + } + start := time.Now() + obj, err := a.srv.AgentService(resp, req) + elapsed := time.Now().Sub(start) + + if tt.wantErr != "" { + require.Error(err) + require.Contains(strings.ToLower(err.Error()), strings.ToLower(tt.wantErr)) + } else { + require.NoError(err) + } + if tt.wantCode != 0 { + require.Equal(tt.wantCode, resp.Code, "body: %s", resp.Body.String()) + } + if tt.wantWait != 0 { + assert.True(elapsed >= tt.wantWait, "should have waited at least %s, "+ + "took %s", tt.wantWait, elapsed) + } else { + assert.True(elapsed < 10*time.Millisecond, "should not have waited, "+ + "took %s", elapsed) + } + + if tt.wantResp != nil { + assert.Equal(tt.wantResp, obj) + assert.Equal(tt.wantResp.ContentHash, resp.Header().Get("X-Consul-ContentHash")) + } else { + // Janky but Equal doesn't help here because nil != + // *api.AgentService((*api.AgentService)(nil)) + assert.Nil(obj) + } + }) + } +} + +// DEPRECATED(managed-proxies) - remove this In the interim, we need the newer +// /agent/service/service to work for managed proxies so we can swithc the built +// in proxy to use only that without breaking managed proxies early. +func TestAgent_Service_DeprecatedManagedProxy(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), ` + connect { + proxy { + allow_managed_api_registration = true + } + } + `) + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + svc := &structs.ServiceDefinition{ + Name: "web", + Port: 8000, + Check: structs.CheckType{ + TTL: 10 * time.Second, + }, + Connect: &structs.ServiceConnect{ + Proxy: &structs.ServiceDefinitionConnectProxy{ + // Fix the command otherwise the executable path ends up being random + // temp dir in every test run so the ContentHash will never match. + Command: []string{"foo"}, + Config: map[string]interface{}{ + "foo": "bar", + "bind_address": "10.10.10.10", + "bind_port": 9999, // make this deterministic + }, + Upstreams: structs.TestUpstreams(t), + }, + }, + } + + require := require.New(t) + + rr := httptest.NewRecorder() + + req, _ := http.NewRequest("POST", "/v1/agent/services/register", jsonReader(svc)) + _, err := a.srv.AgentRegisterService(rr, req) + require.NoError(err) + require.Equal(200, rr.Code, "body:\n"+rr.Body.String()) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest("GET", "/v1/agent/service/web-proxy", nil) + obj, err := a.srv.AgentService(rr, req) + require.NoError(err) + require.Equal(200, rr.Code, "body:\n"+rr.Body.String()) + + gotService, ok := obj.(*api.AgentService) + require.True(ok) + + expect := &api.AgentService{ + Kind: api.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Port: 9999, + Address: "10.10.10.10", + ContentHash: "e24f099e42e88317", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceID: "web", + DestinationServiceName: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8000, + Config: map[string]interface{}{ + "foo": "bar", + "bind_port": 9999, + "bind_address": "10.10.10.10", + "local_service_address": "127.0.0.1:8000", + }, + Upstreams: structs.TestAddDefaultsToUpstreams(t, svc.Connect.Proxy.Upstreams).ToAPI(), + }, + } + + require.Equal(expect, gotService) +} + func TestAgent_Checks(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") @@ -1593,38 +1954,49 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { }, }, }, - SidecarService: &structs.ServiceDefinition{ - Name: "test-proxy", - Meta: map[string]string{ - "some": "meta", - "enable_tag_override": "sidecar_service.meta is 'opaque' so should not get translated", - }, Port: 8001, - EnableTagOverride: true, - Kind: structs.ServiceKindConnectProxy, - Proxy: &structs.ConnectProxyConfig{ - DestinationServiceName: "test", - DestinationServiceID: "test", - LocalServiceAddress: "127.0.0.1", - LocalServicePort: 4321, - Upstreams: structs.Upstreams{ - { - DestinationType: structs.UpstreamDestTypeService, - DestinationName: "db", - DestinationNamespace: "default", - LocalBindAddress: "127.0.0.1", - LocalBindPort: 1234, - Config: map[string]interface{}{ - "destination_type": "sidecar_service.proxy.upstreams.config is 'opaque' so should not get translated", - }, - }, + // The sidecar service is nilled since it is only config sugar and + // shouldn't be represented in state. We assert that the translations + // there worked by inspecting the registered sidecar below. + SidecarService: nil, + }, + } + + got := a.State.Service("test") + require.Equal(t, svc, got) + + sidecarSvc := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "test-sidecar-proxy", + Service: "test-proxy", + Meta: map[string]string{ + "some": "meta", + "enable_tag_override": "sidecar_service.meta is 'opaque' so should not get translated", + }, + Port: 8001, + EnableTagOverride: true, + LocallyRegisteredAsSidecar: true, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "test", + DestinationServiceID: "test", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 4321, + Upstreams: structs.Upstreams{ + { + DestinationType: structs.UpstreamDestTypeService, + DestinationName: "db", + DestinationNamespace: "default", + LocalBindAddress: "127.0.0.1", + LocalBindPort: 1234, + Config: map[string]interface{}{ + "destination_type": "sidecar_service.proxy.upstreams.config is 'opaque' so should not get translated", }, }, }, }, } - got := a.State.Service("test") - require.Equal(t, svc, got) + gotSidecar := a.State.Service("test-sidecar-proxy") + require.Equal(t, sidecarSvc, gotSidecar) } func TestAgent_RegisterService_ACLDeny(t *testing.T) { @@ -1978,6 +2350,21 @@ func testDefaultSidecar(svc string, port int, fns ...func(*structs.NodeService)) return ns } +func testCreateToken(t *testing.T, a *TestAgent, rules string) string { + args := map[string]interface{}{ + "Name": "User Token", + "Type": "client", + "Rules": rules, + } + req, _ := http.NewRequest("PUT", "/v1/acl/create?token=root", jsonReader(args)) + resp := httptest.NewRecorder() + obj, err := a.srv.ACLCreate(resp, req) + require.NoError(t, err) + require.NotNil(t, obj) + aclResp := obj.(aclCreateResponse) + return aclResp.ID +} + // This tests local agent service registration with a sidecar service. Note we // only test simple defaults for the sidecar here since the actual logic for // handling sidecar defaults and port assignment is tested thoroughly in @@ -2361,18 +2748,7 @@ func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) { // Create an ACL token with require policy var token string if tt.enableACL && tt.tokenRules != "" { - args := map[string]interface{}{ - "Name": "User Token", - "Type": "client", - "Rules": tt.tokenRules, - } - req, _ := http.NewRequest("PUT", "/v1/acl/create?token=root", jsonReader(args)) - resp := httptest.NewRecorder() - obj, err := a.srv.ACLCreate(resp, req) - require.NoError(err) - require.NotNil(obj) - aclResp := obj.(aclCreateResponse) - token = aclResp.ID + token = testCreateToken(t, a, tt.tokenRules) } br := bytes.NewBufferString(tt.json) diff --git a/agent/checks/alias.go b/agent/checks/alias.go index e6c7082e57..1b5bbe843e 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -85,16 +85,22 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh) defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID) + updateStatus := func() { + checks := c.Notify.Checks() + checksList := make([]*structs.HealthCheck, 0, len(checks)) + for _, chk := range checks { + checksList = append(checksList, chk) + } + c.processChecks(checksList) + } + + // Immediately run to get the current state of the target service + updateStatus() + for { select { case <-notifyCh: - checks := c.Notify.Checks() - checksList := make([]*structs.HealthCheck, 0, len(checks)) - for _, chk := range checks { - checksList = append(checksList, chk) - } - c.processChecks(checksList) - + updateStatus() case <-stopCh: return } diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index ea6603345f..a908e31a46 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -435,3 +435,31 @@ func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error return nil } + +// Test that local checks immediately reflect the subject states when added and +// don't require an update to the subject before being accurate. +func TestCheckAlias_localInitialStatus(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := types.CheckID("foo") + rpc := &mockRPC{} + chk := &CheckAlias{ + ServiceID: "web", + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + chk.Start() + defer chk.Stop() + + // Don't touch the aliased service or it's checks (there are none but this is + // valid and should be consisded "passing"). + + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} diff --git a/agent/http_oss.go b/agent/http_oss.go index ac5eff335d..fb708f27ee 100644 --- a/agent/http_oss.go +++ b/agent/http_oss.go @@ -18,6 +18,7 @@ func init() { registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPServer).AgentMonitor) registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPServer).AgentMetrics) registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPServer).AgentServices) + registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPServer).AgentService) registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPServer).AgentChecks) registerEndpoint("/v1/agent/members", []string{"GET"}, (*HTTPServer).AgentMembers) registerEndpoint("/v1/agent/join/", []string{"PUT"}, (*HTTPServer).AgentJoin) diff --git a/agent/local/state.go b/agent/local/state.go index b7735f4584..fcfb6a49dd 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -51,10 +51,16 @@ type ServiceState struct { // Deleted is true when the service record has been marked as deleted // but has not been removed on the server yet. Deleted bool + + // WatchCh is closed when the service state changes suitable for use in a + // memdb.WatchSet when watching agent local changes with hash-based blocking. + WatchCh chan struct{} } -// Clone returns a shallow copy of the object. The service record still -// points to the original service record and must not be modified. +// Clone returns a shallow copy of the object. The service record still points +// to the original service record and must not be modified. The WatchCh is also +// still pointing to the original so the clone will be update when the original +// is. func (s *ServiceState) Clone() *ServiceState { s2 := new(ServiceState) *s2 = *s @@ -279,6 +285,10 @@ func (l *State) RemoveService(id string) error { // entry around until it is actually removed. s.InSync = false s.Deleted = true + if s.WatchCh != nil { + close(s.WatchCh) + s.WatchCh = nil + } l.TriggerSyncChanges() return nil @@ -313,9 +323,10 @@ func (l *State) Services() map[string]*structs.NodeService { return m } -// ServiceState returns a shallow copy of the current service state -// record. The service record still points to the original service -// record and must not be modified. +// ServiceState returns a shallow copy of the current service state record. The +// service record still points to the original service record and must not be +// modified. The WatchCh for the copy returned will also be closed when the +// actual service state is changed. func (l *State) ServiceState(id string) *ServiceState { l.RLock() defer l.RUnlock() @@ -334,7 +345,15 @@ func (l *State) SetServiceState(s *ServiceState) { l.Lock() defer l.Unlock() + s.WatchCh = make(chan struct{}) + + old, hasOld := l.services[s.Service.ID] l.services[s.Service.ID] = s + + if hasOld && old.WatchCh != nil { + close(old.WatchCh) + } + l.TriggerSyncChanges() } @@ -408,13 +427,13 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { return nil } -// AddAliasCheck creates an alias check. When any check for the srcServiceID -// is changed, checkID will reflect that using the same semantics as +// AddAliasCheck creates an alias check. When any check for the srcServiceID is +// changed, checkID will reflect that using the same semantics as // checks.CheckAlias. // -// This is a local optimization so that the Alias check doesn't need to -// use blocking queries against the remote server for check updates for -// local services. +// This is a local optimization so that the Alias check doesn't need to use +// blocking queries against the remote server for check updates for local +// services. func (l *State) AddAliasCheck(checkID types.CheckID, srcServiceID string, notifyCh chan<- struct{}) error { l.Lock() defer l.Unlock() diff --git a/agent/local/state_test.go b/agent/local/state_test.go index ff1f12daa3..4eb16e9537 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -417,6 +417,91 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { assert.Nil(servicesInSync(a.State, 3)) } +func TestAgent_ServiceWatchCh(t *testing.T) { + t.Parallel() + a := &agent.TestAgent{Name: t.Name()} + a.Start() + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + require := require.New(t) + + // register a local service + srv1 := &structs.NodeService{ + ID: "svc_id1", + Service: "svc1", + Tags: []string{"tag1"}, + Port: 6100, + } + require.NoError(a.State.AddService(srv1, "")) + + verifyState := func(ss *local.ServiceState) { + require.NotNil(ss) + require.NotNil(ss.WatchCh) + + // Sanity check WatchCh blocks + select { + case <-ss.WatchCh: + t.Fatal("should block until service changes") + default: + } + } + + // Should be able to get a ServiceState + ss := a.State.ServiceState(srv1.ID) + verifyState(ss) + + // Update service in another go routine + go func() { + srv2 := srv1 + srv2.Port = 6200 + require.NoError(a.State.AddService(srv2, "")) + }() + + // We should observe WatchCh close + select { + case <-ss.WatchCh: + // OK! + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for WatchCh to close") + } + + // Should also fire for state being set explicitly + ss = a.State.ServiceState(srv1.ID) + verifyState(ss) + + go func() { + a.State.SetServiceState(&local.ServiceState{ + Service: ss.Service, + Token: "foo", + }) + }() + + // We should observe WatchCh close + select { + case <-ss.WatchCh: + // OK! + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for WatchCh to close") + } + + // Should also fire for service being removed + ss = a.State.ServiceState(srv1.ID) + verifyState(ss) + + go func() { + require.NoError(a.State.RemoveService(srv1.ID)) + }() + + // We should observe WatchCh close + select { + case <-ss.WatchCh: + // OK! + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for WatchCh to close") + } +} + func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { t.Parallel() a := &agent.TestAgent{Name: t.Name()} diff --git a/agent/proxyprocess/proxy.go b/agent/proxyprocess/proxy.go index e2ef7fe47a..321dcb6a9c 100644 --- a/agent/proxyprocess/proxy.go +++ b/agent/proxyprocess/proxy.go @@ -20,6 +20,11 @@ const ( // EnvProxyToken is the name of the environment variable that is passed // to managed proxies containing the proxy token. EnvProxyToken = "CONNECT_PROXY_TOKEN" + + // EnvSidecarFor is the name of the environment variable that is set for + // sidecar proxies containing the service ID of their target on the local + // agent + EnvSidecarFor = "CONNECT_SIDECAR_FOR" ) // Proxy is the interface implemented by all types of managed proxies. diff --git a/agent/sidecar_service.go b/agent/sidecar_service.go index f8c3c03d5b..ba31e2f74d 100644 --- a/agent/sidecar_service.go +++ b/agent/sidecar_service.go @@ -2,7 +2,6 @@ package agent import ( "fmt" - "math/rand" "time" "github.com/hashicorp/consul/agent/structs" @@ -88,20 +87,45 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str // Allocate port if needed (min and max inclusive). rangeLen := a.config.ConnectSidecarMaxPort - a.config.ConnectSidecarMinPort + 1 if sidecar.Port < 1 && a.config.ConnectSidecarMinPort > 0 && rangeLen > 0 { - // This should be a really short list so don't bother optimising lookup yet. - OUTER: - for _, offset := range rand.Perm(rangeLen) { - p := a.config.ConnectSidecarMinPort + offset - // See if this port was already allocated to another service - for _, otherNS := range a.State.Services() { - if otherNS.Port == p { - // already taken, skip to next random pick in the range - continue OUTER + // This did pick at random which was simpler but consul reload would assign + // new ports to all the sidecar since it unloads all state and re-populates. + // It also made this more difficult to test (have to pin the range to one + // etc.). Instead we assign sequentially, but rather than N^2 lookups, just + // iterated services once and find the set of used ports in allocation + // range. We could maintain this state permenantly in agent but it doesn't + // seem to be necessary - even with thousands of services this is not + // expensive to compute. + usedPorts := make(map[int]struct{}) + for _, otherNS := range a.State.Services() { + // Check if other port is in auto-assign range + if otherNS.Port >= a.config.ConnectSidecarMinPort && + otherNS.Port <= a.config.ConnectSidecarMaxPort { + if otherNS.ID == sidecar.ID { + // This sidecar is already registered with an auto-port and is just + // being updated so pick the same port as before rather than allocate + // a new one. + sidecar.Port = otherNS.Port + break + } + usedPorts[otherNS.Port] = struct{}{} + } + // Note that the proxy might already be registered with a port that was + // not in the auto range or the auto range has moved. In either case we + // want to allocate a new one so it's no different from ignoring that it + // already exists as we do now. + } + + // Check we still need to assign a port and didn't find we already had one + // allocated. + if sidecar.Port < 1 { + // Iterate until we find lowest unused port + for p := a.config.ConnectSidecarMinPort; p <= a.config.ConnectSidecarMaxPort; p++ { + _, used := usedPorts[p] + if !used { + sidecar.Port = p + break } } - // We made it through all existing proxies without a match so claim this one - sidecar.Port = p - break } } // If no ports left (or auto ports disabled) fail diff --git a/agent/sidecar_service_test.go b/agent/sidecar_service_test.go index 3f3e7a7734..fc6d68983e 100644 --- a/agent/sidecar_service_test.go +++ b/agent/sidecar_service_test.go @@ -1,6 +1,7 @@ package agent import ( + "fmt" "testing" "time" @@ -11,6 +12,7 @@ import ( func TestAgent_sidecarServiceFromNodeService(t *testing.T) { tests := []struct { name string + maxPort int preRegister *structs.ServiceDefinition sd *structs.ServiceDefinition token string @@ -200,17 +202,74 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) { token: "foo", wantErr: "reserved for internal use", }, + { + name: "re-registering same sidecar with no port should pick same one", + // Allow multiple ports to be sure we get the right one + maxPort: 2500, + // Pre register the sidecar we want + preRegister: &structs.ServiceDefinition{ + Kind: structs.ServiceKindConnectProxy, + ID: "web1-sidecar-proxy", + Name: "web-sidecar-proxy", + Port: 2222, + Proxy: &structs.ConnectProxyConfig{ + DestinationServiceName: "web", + DestinationServiceID: "web1", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 1111, + }, + }, + // Register same again but with different service port + sd: &structs.ServiceDefinition{ + ID: "web1", + Name: "web", + Port: 1112, + Connect: &structs.ServiceConnect{ + SidecarService: &structs.ServiceDefinition{}, + }, + }, + token: "foo", + wantNS: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web1-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 2222, // Should claim the same port as before + LocallyRegisteredAsSidecar: true, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + DestinationServiceID: "web1", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 1112, + }, + }, + wantChecks: []*structs.CheckType{ + &structs.CheckType{ + Name: "Connect Sidecar Listening", + TCP: "127.0.0.1:2222", + Interval: 10 * time.Second, + }, + &structs.CheckType{ + Name: "Connect Sidecar Aliasing web1", + AliasService: "web1", + }, + }, + wantToken: "foo", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Set port range to make it deterministic. This allows a single assigned - // port at 2222 thanks to being inclusive at both ends. - hcl := ` + // Set port range to be tiny (one availabl) to test consuming all of it. + // This allows a single assigned port at 2222 thanks to being inclusive at + // both ends. + if tt.maxPort == 0 { + tt.maxPort = 2222 + } + hcl := fmt.Sprintf(` ports { sidecar_min_port = 2222 - sidecar_max_port = 2222 + sidecar_max_port = %d } - ` + `, tt.maxPort) if tt.autoPortsDisabled { hcl = ` ports { diff --git a/agent/structs/testing_connect_proxy_config.go b/agent/structs/testing_connect_proxy_config.go index aa058f58ff..700154b56e 100644 --- a/agent/structs/testing_connect_proxy_config.go +++ b/agent/structs/testing_connect_proxy_config.go @@ -39,7 +39,7 @@ func TestUpstreams(t testing.T) Upstreams { // TestUpstreams) and adds default values that are populated during // refigistration. Use this for generating the expected Upstreams value after // registration. -func TestAddDefaultsToUpstreams(t testing.T, upstreams []Upstream) []Upstream { +func TestAddDefaultsToUpstreams(t testing.T, upstreams []Upstream) Upstreams { ups := make([]Upstream, len(upstreams)) for i := range upstreams { ups[i] = upstreams[i] diff --git a/api/agent.go b/api/agent.go index 9c6665ffe4..f9447a25a0 100644 --- a/api/agent.go +++ b/api/agent.go @@ -82,6 +82,7 @@ type AgentService struct { EnableTagOverride bool CreateIndex uint64 `json:",omitempty"` ModifyIndex uint64 `json:",omitempty"` + ContentHash string `json:",omitempty"` // DEPRECATED (ProxyDestination) - remove this field ProxyDestination string `json:",omitempty"` Proxy *AgentServiceConnectProxyConfig `json:",omitempty"` @@ -262,10 +263,12 @@ type ConnectProxyConfig struct { TargetServiceID string TargetServiceName string ContentHash string - ExecMode ProxyExecMode - Command []string - Config map[string]interface{} - Upstreams []Upstream + // DEPRECATED(managed-proxies) - this struct is re-used for sidecar configs + // but they don't need ExecMode or Command + ExecMode ProxyExecMode `json:",omitempty"` + Command []string `json:",omitempty"` + Config map[string]interface{} + Upstreams []Upstream } // Upstream is the response structure for a proxy upstream configuration. @@ -384,6 +387,33 @@ func (a *Agent) Services() (map[string]*AgentService, error) { return out, nil } +// Service returns a locally registered service instance and allows for +// hash-based blocking. +// +// Note that this uses an unconventional blocking mechanism since it's +// agent-local state. That means there is no persistent raft index so we block +// based on object hash instead. +func (a *Agent) Service(serviceID string, q *QueryOptions) (*AgentService, *QueryMeta, error) { + r := a.c.newRequest("GET", "/v1/agent/service/"+serviceID) + r.setQueryOptions(q) + rtt, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + var out *AgentService + if err := decodeBody(resp, &out); err != nil { + return nil, nil, err + } + + return out, qm, nil +} + // Members returns the known gossip members. The WAN // flag can be used to query a server for WAN members. func (a *Agent) Members(wan bool) ([]*AgentMember, error) { diff --git a/api/agent_test.go b/api/agent_test.go index 02fb0f0881..27da62eb18 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -642,6 +642,57 @@ func TestAPI_AgentServices_MultipleChecks(t *testing.T) { } } +func TestAPI_AgentService(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + agent := c.Agent() + + require := require.New(t) + + reg := &AgentServiceRegistration{ + Name: "foo", + Tags: []string{"bar", "baz"}, + Port: 8000, + Checks: AgentServiceChecks{ + &AgentServiceCheck{ + TTL: "15s", + }, + &AgentServiceCheck{ + TTL: "30s", + }, + }, + } + require.NoError(agent.ServiceRegister(reg)) + + got, qm, err := agent.Service("foo", nil) + require.NoError(err) + + expect := &AgentService{ + ID: "foo", + Service: "foo", + Tags: []string{"bar", "baz"}, + ContentHash: "bf5bd67c5d74b26d", + Port: 8000, + } + require.Equal(expect, got) + require.Equal(expect.ContentHash, qm.LastContentHash) + + // Sanity check blocking behaviour - this is more thoroughly tested in the + // agent endpoint tests but this ensures that the API package is at least + // passing the hash param properly. + opts := QueryOptions{ + WaitHash: "bf5bd67c5d74b26d", + WaitTime: 100 * time.Millisecond, // Just long enough to be reliably measurable + } + start := time.Now() + got, qm, err = agent.Service("foo", &opts) + elapsed := time.Since(start) + require.NoError(err) + require.True(elapsed >= opts.WaitTime) +} + func TestAPI_AgentSetTTLStatus(t *testing.T) { t.Parallel() c, s := makeClient(t) diff --git a/command/connect/proxy/proxy.go b/command/connect/proxy/proxy.go index 779e8e9e85..2b0ddf551e 100644 --- a/command/connect/proxy/proxy.go +++ b/command/connect/proxy/proxy.go @@ -11,6 +11,7 @@ import ( "os" "sort" "strconv" + "strings" proxyAgent "github.com/hashicorp/consul/agent/proxyprocess" "github.com/hashicorp/consul/api" @@ -51,6 +52,7 @@ type cmd struct { logLevel string cfgFile string proxyID string + sidecarFor string pprofAddr string service string serviceAddr string @@ -69,6 +71,12 @@ func (c *cmd) init() { c.flags.StringVar(&c.proxyID, "proxy-id", "", "The proxy's ID on the local agent.") + c.flags.StringVar(&c.sidecarFor, "sidecar-for", "", + "The ID of a service instance on the local agent that this proxy should "+ + "become a sidecar for. It requires that the proxy service is registered "+ + "with the agent as a connect-proxy with Proxy.DestinationServiceID set "+ + "to this value. If more than one such proxy is registered it will fail.") + c.flags.StringVar(&c.logLevel, "log-level", "INFO", "Specifies the log level.") @@ -118,6 +126,9 @@ func (c *cmd) Run(args []string) int { if c.proxyID == "" { c.proxyID = os.Getenv(proxyAgent.EnvProxyID) } + if c.sidecarFor == "" { + c.sidecarFor = os.Getenv(proxyAgent.EnvSidecarFor) + } if c.http.Token() == "" { c.http.SetToken(os.Getenv(proxyAgent.EnvProxyToken)) } @@ -200,6 +211,32 @@ func (c *cmd) Run(args []string) int { return 0 } +func (c *cmd) lookupProxyIDForSidecar(client *api.Client) (string, error) { + svcs, err := client.Agent().Services() + if err != nil { + return "", fmt.Errorf("Failed looking up sidecar proxy info for %s: %s", + c.sidecarFor, err) + } + + var proxyIDs []string + for _, svc := range svcs { + if svc.Kind == api.ServiceKindConnectProxy && svc.Proxy != nil && + strings.ToLower(svc.Proxy.DestinationServiceID) == c.sidecarFor { + proxyIDs = append(proxyIDs, svc.ID) + } + } + + if len(proxyIDs) == 0 { + return "", fmt.Errorf("No sidecar proxy registereded for %s", c.sidecarFor) + } + if len(proxyIDs) > 1 { + return "", fmt.Errorf("More than one sidecar proxy registereded for %s.\n"+ + " Start proxy with -proxy-id and one of the following IDs: %s", + c.sidecarFor, strings.Join(proxyIDs, ", ")) + } + return proxyIDs[0], nil +} + func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error) { // Use the configured proxy ID if c.proxyID != "" { @@ -208,6 +245,21 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error) return proxyImpl.NewAgentConfigWatcher(client, c.proxyID, c.logger) } + if c.sidecarFor != "" { + // Running as a sidecar, we need to find the proxy-id for the requested + // service + var err error + c.proxyID, err = c.lookupProxyIDForSidecar(client) + if err != nil { + return nil, err + } + + c.UI.Info("Configuration mode: Agent API") + c.UI.Info(fmt.Sprintf(" Sidecar for ID: %s", c.sidecarFor)) + c.UI.Info(fmt.Sprintf(" Proxy ID: %s", c.proxyID)) + return proxyImpl.NewAgentConfigWatcher(client, c.proxyID, c.logger) + } + // Otherwise, we're representing a manually specified service. if c.service == "" { return nil, fmt.Errorf( diff --git a/command/connect/proxy/proxy_test.go b/command/connect/proxy/proxy_test.go index 9970a6d7a0..f2f470281b 100644 --- a/command/connect/proxy/proxy_test.go +++ b/command/connect/proxy/proxy_test.go @@ -14,27 +14,28 @@ func TestCommandConfigWatcher(t *testing.T) { t.Parallel() cases := []struct { - Name string - Flags []string - Test func(*testing.T, *proxy.Config) + Name string + Flags []string + Test func(*testing.T, *proxy.Config) + WantErr string }{ { - "-service flag only", - []string{"-service", "web"}, - func(t *testing.T, cfg *proxy.Config) { + Name: "-service flag only", + Flags: []string{"-service", "web"}, + Test: func(t *testing.T, cfg *proxy.Config) { require.Equal(t, 0, cfg.PublicListener.BindPort) require.Len(t, cfg.Upstreams, 0) }, }, { - "-service flag with upstreams", - []string{ + Name: "-service flag with upstreams", + Flags: []string{ "-service", "web", "-upstream", "db:1234", "-upstream", "db2:2345", }, - func(t *testing.T, cfg *proxy.Config) { + Test: func(t *testing.T, cfg *proxy.Config) { require.Equal(t, 0, cfg.PublicListener.BindPort) require.Len(t, cfg.Upstreams, 2) require.Equal(t, 1234, cfg.Upstreams[0].LocalBindPort) @@ -43,9 +44,9 @@ func TestCommandConfigWatcher(t *testing.T) { }, { - "-service flag with -service-addr", - []string{"-service", "web"}, - func(t *testing.T, cfg *proxy.Config) { + Name: "-service flag with -service-addr", + Flags: []string{"-service", "web"}, + Test: func(t *testing.T, cfg *proxy.Config) { // -service-addr has no affect since -listen isn't set require.Equal(t, 0, cfg.PublicListener.BindPort) require.Len(t, cfg.Upstreams, 0) @@ -53,13 +54,13 @@ func TestCommandConfigWatcher(t *testing.T) { }, { - "-service, -service-addr, -listen", - []string{ + Name: "-service, -service-addr, -listen", + Flags: []string{ "-service", "web", "-service-addr", "127.0.0.1:1234", "-listen", ":4567", }, - func(t *testing.T, cfg *proxy.Config) { + Test: func(t *testing.T, cfg *proxy.Config) { require.Len(t, cfg.Upstreams, 0) require.Equal(t, "", cfg.PublicListener.BindAddress) @@ -67,13 +68,81 @@ func TestCommandConfigWatcher(t *testing.T) { require.Equal(t, "127.0.0.1:1234", cfg.PublicListener.LocalServiceAddress) }, }, + + { + Name: "-sidecar-for, no sidecar", + Flags: []string{ + "-sidecar-for", "no-sidecar", + }, + WantErr: "No sidecar proxy registered", + }, + + { + Name: "-sidecar-for, multiple sidecars", + Flags: []string{ + "-sidecar-for", "two-sidecars", + }, + // Order is non-deterministic so don't assert the list of proxy IDs here + WantErr: `More than one sidecar proxy registereded for two-sidecars. + Start proxy with -proxy-id and one of the following IDs: `, + }, + + { + Name: "-sidecar-for, non-existent", + Flags: []string{ + "-sidecar-for", "foo", + }, + WantErr: "No sidecar proxy registered", + }, + + { + Name: "-sidecar-for, one sidecar", + Flags: []string{ + "-sidecar-for", "one-sidecar", + }, + Test: func(t *testing.T, cfg *proxy.Config) { + // Sanity check we got the right instance. + require.Equal(t, 9999, cfg.PublicListener.BindPort) + }, + }, } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { require := require.New(t) - a := agent.NewTestAgent(t.Name(), ``) + // Registere a few services with 0, 1 and 2 sidecars + a := agent.NewTestAgent(t.Name(), ` + services { + name = "no-sidecar" + port = 1111 + } + services { + name = "one-sidecar" + port = 2222 + connect { + sidecar_service { + port = 9999 + } + } + } + services { + name = "two-sidecars" + port = 3333 + connect { + sidecar_service {} + } + } + services { + kind = "connect-proxy" + name = "other-sidecar-for-two-sidecars" + port = 4444 + proxy { + destination_service_id = "two-sidecars" + destination_service_name = "two-sidecars" + } + } + `) defer a.Shutdown() client := a.Client() @@ -81,16 +150,24 @@ func TestCommandConfigWatcher(t *testing.T) { c := New(ui, make(chan struct{})) c.testNoStart = true - // Run and purposely fail the command + // Run the command code := c.Run(append([]string{ "-http-addr=" + a.HTTPAddr(), }, tc.Flags...)) - require.Equal(0, code, ui.ErrorWriter.String()) + if tc.WantErr == "" { + require.Equal(0, code, ui.ErrorWriter.String()) + } else { + require.Equal(1, code, ui.ErrorWriter.String()) + require.Contains(ui.ErrorWriter.String(), tc.WantErr) + return + } // Get the configuration watcher cw, err := c.configWatcher(client) require.NoError(err) - tc.Test(t, testConfig(t, cw)) + if tc.Test != nil { + tc.Test(t, testConfig(t, cw)) + } }) } } diff --git a/command/watch/watch.go b/command/watch/watch.go index bf46914576..457853ea2a 100644 --- a/command/watch/watch.go +++ b/command/watch/watch.go @@ -135,7 +135,7 @@ func (c *cmd) Run(args []string) int { return 1 } - if strings.HasPrefix(wp.Type, "connect_") { + if strings.HasPrefix(wp.Type, "connect_") || strings.HasPrefix(wp.Type, "agent_") { c.UI.Error(fmt.Sprintf("Type %s is not supported in the CLI tool", wp.Type)) return 1 } diff --git a/command/watch/watch_test.go b/command/watch/watch_test.go index ff6463852c..735418fff7 100644 --- a/command/watch/watch_test.go +++ b/command/watch/watch_test.go @@ -57,3 +57,23 @@ func TestWatchCommandNoConnect(t *testing.T) { t.Fatalf("bad: %#v", ui.ErrorWriter.String()) } } + +func TestWatchCommandNoAgentService(t *testing.T) { + t.Parallel() + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + + ui := cli.NewMockUi() + c := New(ui, nil) + args := []string{"-http-addr=" + a.HTTPAddr(), "-type=agent_service"} + + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.ErrorWriter.String(), + "Type agent_service is not supported in the CLI tool") { + t.Fatalf("bad: %#v", ui.ErrorWriter.String()) + } +} diff --git a/connect/proxy/config.go b/connect/proxy/config.go index 628a508ba7..95c974bef9 100644 --- a/connect/proxy/config.go +++ b/connect/proxy/config.go @@ -194,8 +194,8 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string, // Setup watch plan for config plan, err := watch.Parse(map[string]interface{}{ - "type": "connect_proxy_config", - "proxy_service_id": w.proxyID, + "type": "agent_service", + "service_id": w.proxyID, }) if err != nil { return nil, err @@ -209,20 +209,26 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string, func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal, val interface{}) { - resp, ok := val.(*api.ConnectProxyConfig) + resp, ok := val.(*api.AgentService) if !ok { w.logger.Printf("[WARN] proxy config watch returned bad response: %v", val) return } + if resp.Kind != api.ServiceKindConnectProxy { + w.logger.Printf("[ERR] service with id %s is not a valid connect proxy", + w.proxyID) + return + } + // Create proxy config from the response cfg := &Config{ // Token should be already setup in the client - ProxiedServiceName: resp.TargetServiceName, + ProxiedServiceName: resp.Proxy.DestinationServiceName, ProxiedServiceNamespace: "default", } - if tRaw, ok := resp.Config["telemetry"]; ok { + if tRaw, ok := resp.Proxy.Config["telemetry"]; ok { err := mapstructure.Decode(tRaw, &cfg.Telemetry) if err != nil { w.logger.Printf("[WARN] proxy telemetry config failed to parse: %s", err) @@ -230,15 +236,18 @@ func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal, } // Unmarshal configs - err := mapstructure.Decode(resp.Config, &cfg.PublicListener) + err := mapstructure.Decode(resp.Proxy.Config, &cfg.PublicListener) if err != nil { - w.logger.Printf("[ERR] proxy config watch public listener config "+ - "couldn't be parsed: %s", err) - return + w.logger.Printf("[ERR] failed to parse public listener config: %s", err) } + cfg.PublicListener.BindAddress = resp.Address + cfg.PublicListener.BindPort = resp.Port + cfg.PublicListener.LocalServiceAddress = fmt.Sprintf("%s:%d", + resp.Proxy.LocalServiceAddress, resp.Proxy.LocalServicePort) + cfg.PublicListener.applyDefaults() - for _, u := range resp.Upstreams { + for _, u := range resp.Proxy.Upstreams { uc := UpstreamConfig(u) uc.applyDefaults() cfg.Upstreams = append(cfg.Upstreams, uc) diff --git a/connect/proxy/config_test.go b/connect/proxy/config_test.go index c94fdab750..7129b74acd 100644 --- a/connect/proxy/config_test.go +++ b/connect/proxy/config_test.go @@ -79,7 +79,7 @@ func TestUpstreamResolverFuncFromClient(t *testing.T) { } } -func TestAgentConfigWatcher(t *testing.T) { +func TestAgentConfigWatcherManagedProxy(t *testing.T) { t.Parallel() a := agent.NewTestAgent("agent_smith", ` @@ -95,7 +95,6 @@ func TestAgentConfigWatcher(t *testing.T) { client := a.Client() agent := client.Agent() - // Register a service with a proxy // Register a local agent service with a managed proxy reg := &api.AgentServiceRegistration{ Name: "web", @@ -178,6 +177,96 @@ func TestAgentConfigWatcher(t *testing.T) { assert.Equal(t, expectCfg, cfg) } +func TestAgentConfigWatcherSidecarProxy(t *testing.T) { + t.Parallel() + + a := agent.NewTestAgent("agent_smith", ``) + defer a.Shutdown() + + client := a.Client() + agent := client.Agent() + + // Register a local agent service with a managed proxy + reg := &api.AgentServiceRegistration{ + Name: "web", + Port: 8080, + Connect: &api.AgentServiceConnect{ + SidecarService: &api.AgentServiceRegistration{ + Proxy: &api.AgentServiceConnectProxyConfig{ + Config: map[string]interface{}{ + "handshake_timeout_ms": 999, + }, + Upstreams: []api.Upstream{ + { + DestinationName: "db", + LocalBindPort: 9191, + }, + }, + }, + }, + }, + } + err := agent.ServiceRegister(reg) + require.NoError(t, err) + + w, err := NewAgentConfigWatcher(client, "web-sidecar-proxy", + log.New(os.Stderr, "", log.LstdFlags)) + require.NoError(t, err) + + cfg := testGetConfigValTimeout(t, w, 500*time.Millisecond) + + expectCfg := &Config{ + ProxiedServiceName: "web", + ProxiedServiceNamespace: "default", + PublicListener: PublicListenerConfig{ + BindAddress: "0.0.0.0", + BindPort: 21000, + LocalServiceAddress: "127.0.0.1:8080", + HandshakeTimeoutMs: 999, + LocalConnectTimeoutMs: 1000, // from applyDefaults + }, + Upstreams: []UpstreamConfig{ + { + DestinationName: "db", + DestinationNamespace: "default", + DestinationType: "service", + LocalBindPort: 9191, + LocalBindAddress: "127.0.0.1", + }, + }, + } + + require.Equal(t, expectCfg, cfg) + + // Now keep watching and update the config. + go func() { + // Wait for watcher to be watching + time.Sleep(20 * time.Millisecond) + reg.Connect.SidecarService.Proxy.Upstreams = append(reg.Connect.SidecarService.Proxy.Upstreams, + api.Upstream{ + DestinationName: "cache", + LocalBindPort: 9292, + LocalBindAddress: "127.10.10.10", + }) + reg.Connect.SidecarService.Proxy.Config["local_connect_timeout_ms"] = 444 + err := agent.ServiceRegister(reg) + require.NoError(t, err) + }() + + cfg = testGetConfigValTimeout(t, w, 2*time.Second) + + expectCfg.Upstreams = append(expectCfg.Upstreams, UpstreamConfig{ + DestinationName: "cache", + DestinationNamespace: "default", + DestinationType: "service", + LocalBindPort: 9292, + LocalBindAddress: "127.10.10.10", + }) + expectCfg.PublicListener.LocalConnectTimeoutMs = 444 + + assert.Equal(t, expectCfg, cfg) +} + func testGetConfigValTimeout(t *testing.T, w ConfigWatcher, timeout time.Duration) *Config { t.Helper() diff --git a/connect/proxy/proxy.go b/connect/proxy/proxy.go index ecf2170b04..b6c5edff9b 100644 --- a/connect/proxy/proxy.go +++ b/connect/proxy/proxy.go @@ -1,7 +1,6 @@ package proxy import ( - "bytes" "crypto/x509" "log" @@ -69,12 +68,17 @@ func (p *Proxy) Serve() error { go func() { <-service.ReadyWait() - p.logger.Printf("[INFO] proxy loaded config and ready to serve") + p.logger.Printf("[INFO] Proxy loaded config and ready to serve") tcfg := service.ServerTLSConfig() cert, _ := tcfg.GetCertificate(nil) leaf, _ := x509.ParseCertificate(cert.Certificate[0]) - p.logger.Printf("[DEBUG] leaf: %s roots: %s", leaf.URIs[0], - bytes.Join(tcfg.RootCAs.Subjects(), []byte(","))) + p.logger.Printf("[INFO] TLS Identity: %s", leaf.URIs[0]) + roots, err := connect.CommonNamesFromCertPool(tcfg.RootCAs) + if err != nil { + p.logger.Printf("[ERR] Failed to parse root subjects: %s", err) + } else { + p.logger.Printf("[INFO] TLS Roots : %v", roots) + } // Only start a listener if we have a port set. This allows // the configuration to disable our public listener. @@ -87,6 +91,7 @@ func (p *Proxy) Serve() error { p.logger.Printf("[ERR] failed to start public listener: %s", err) failCh <- err } + } }() } diff --git a/connect/tls.go b/connect/tls.go index 7c513dc611..86daa6645d 100644 --- a/connect/tls.go +++ b/connect/tls.go @@ -3,6 +3,8 @@ package connect import ( "crypto/tls" "crypto/x509" + "crypto/x509/pkix" + "encoding/asn1" "errors" "fmt" "io/ioutil" @@ -107,6 +109,33 @@ func devTLSConfigFromFiles(caFile, certFile, return cfg, nil } +// PKIXNameFromRawSubject attempts to parse a DER encoded "Subject" as a PKIX +// Name. It's useful for inspecting root certificates in an x509.CertPool which +// only expose RawSubject via the Subjects method. +func PKIXNameFromRawSubject(raw []byte) (*pkix.Name, error) { + var subject pkix.RDNSequence + if _, err := asn1.Unmarshal(raw, &subject); err != nil { + return nil, err + } + var name pkix.Name + name.FillFromRDNSequence(&subject) + return &name, nil +} + +// CommonNamesFromCertPool returns the common names of the certificates in the +// cert pool. +func CommonNamesFromCertPool(p *x509.CertPool) ([]string, error) { + var names []string + for _, rawSubj := range p.Subjects() { + n, err := PKIXNameFromRawSubject(rawSubj) + if err != nil { + return nil, err + } + names = append(names, n.CommonName) + } + return names, nil +} + // CertURIFromConn is a helper to extract the service identifier URI from a // net.Conn. If the net.Conn is not a *tls.Conn then an error is always // returned. If the *tls.Conn didn't present a valid connect certificate, or is diff --git a/watch/funcs.go b/watch/funcs.go index a87fd63f4c..497ed7eb6e 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -26,6 +26,7 @@ func init() { "connect_roots": connectRootsWatch, "connect_leaf": connectLeafWatch, "connect_proxy_config": connectProxyConfigWatch, + "agent_service": agentServiceWatch, } } @@ -305,6 +306,33 @@ func connectProxyConfigWatch(params map[string]interface{}) (WatcherFunc, error) return fn, nil } +// agentServiceWatch is used to watch for changes to a single service instance +// on the local agent. Note that this state is agent-local so the watch +// mechanism uses `hash` rather than `index` for deciding whether to block. +func agentServiceWatch(params map[string]interface{}) (WatcherFunc, error) { + // We don't support consistency modes since it's agent local data + + var serviceID string + if err := assignValue(params, "service_id", &serviceID); err != nil { + return nil, err + } + + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { + agent := p.client.Agent() + opts := makeQueryOptionsWithContext(p, false) + defer p.cancelFunc() + + svc, _, err := agent.Service(serviceID, &opts) + if err != nil { + return nil, nil, err + } + + // Return string ContentHash since we don't have Raft indexes to block on. + return WaitHashVal(svc.ContentHash), svc, err + } + return fn, nil +} + func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions { ctx, cancel := context.WithCancel(context.Background()) p.cancelFunc = cancel diff --git a/watch/funcs_test.go b/watch/funcs_test.go index aaaa1ad656..d50bb37636 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -736,6 +736,63 @@ func TestConnectProxyConfigWatch(t *testing.T) { wg.Wait() } +func TestAgentServiceWatch(t *testing.T) { + t.Parallel() + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + // Register a local agent service + reg := &consulapi.AgentServiceRegistration{ + Name: "web", + Port: 8080, + } + client := a.Client() + agent := client.Agent() + err := agent.ServiceRegister(reg) + require.NoError(t, err) + + invoke := makeInvokeCh() + plan := mustParse(t, `{"type":"agent_service", "service_id":"web"}`) + plan.HybridHandler = func(blockParamVal watch.BlockingParamVal, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(*consulapi.AgentService) + if !ok || v == nil { + return // ignore + } + invoke <- nil + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(20 * time.Millisecond) + + // Change the service definition + reg.Port = 9090 + err := agent.ServiceRegister(reg) + require.NoError(t, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { + t.Fatalf("err: %v", err) + } + + plan.Stop() + wg.Wait() +} + func mustParse(t *testing.T, q string) *watch.Plan { t.Helper() var params map[string]interface{}