mirror of https://github.com/status-im/consul.git
Agent Connect Proxy config endpoint with hash-based blocking
This commit is contained in:
parent
3e3f0e1f31
commit
2a69663448
|
@ -7,6 +7,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
|
@ -26,6 +27,7 @@ import (
|
||||||
|
|
||||||
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
|
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
|
||||||
"github.com/mitchellh/go-testing-interface"
|
"github.com/mitchellh/go-testing-interface"
|
||||||
|
"github.com/mitchellh/hashstructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Self struct {
|
type Self struct {
|
||||||
|
@ -896,6 +898,123 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.
|
||||||
return &reply, nil
|
return &reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GET /v1/agent/connect/proxy/:proxy_service_id
|
||||||
|
//
|
||||||
|
// Returns the local proxy config for the identified proxy. Requires token=
|
||||||
|
// param with the correct local ProxyToken (not ACL token).
|
||||||
|
func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
|
// Get the proxy ID. Note that this is the ID of a proxy's service instance.
|
||||||
|
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/proxy/")
|
||||||
|
|
||||||
|
// Maybe block
|
||||||
|
var queryOpts structs.QueryOptions
|
||||||
|
if parseWait(resp, req, &queryOpts) {
|
||||||
|
// parseWait returns an error itself
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse hash specially since it's only this endpoint that uses it currently.
|
||||||
|
// Eventually this should happen in parseWait and end up in QueryOptions but I
|
||||||
|
// didn't want to make very general changes right away.
|
||||||
|
hash := req.URL.Query().Get("hash")
|
||||||
|
|
||||||
|
return s.agentLocalBlockingQuery(hash, &queryOpts,
|
||||||
|
func(updateCh chan struct{}) (string, interface{}, error) {
|
||||||
|
// Retrieve the proxy specified
|
||||||
|
proxy := s.agent.State.Proxy(id)
|
||||||
|
if proxy == nil {
|
||||||
|
resp.WriteHeader(http.StatusNotFound)
|
||||||
|
fmt.Fprintf(resp, "unknown proxy service ID: %s", id)
|
||||||
|
return "", nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup the target service as a convenience
|
||||||
|
target := s.agent.State.Service(proxy.Proxy.TargetServiceID)
|
||||||
|
if target == nil {
|
||||||
|
// Not found since this endpoint is only useful for agent-managed proxies so
|
||||||
|
// service missing means the service was deregistered racily with this call.
|
||||||
|
resp.WriteHeader(http.StatusNotFound)
|
||||||
|
fmt.Fprintf(resp, "unknown target service ID: %s", proxy.Proxy.TargetServiceID)
|
||||||
|
return "", nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup "watch" on the proxy being modified and respond on chan if it is.
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-updateCh:
|
||||||
|
// blocking query timedout or was cancelled. Abort
|
||||||
|
return
|
||||||
|
case <-proxy.WatchCh:
|
||||||
|
// Proxy was updated or removed, report it
|
||||||
|
updateCh <- struct{}{}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
hash, err := hashstructure.Hash(proxy.Proxy, nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
|
contentHash := fmt.Sprintf("%x", hash)
|
||||||
|
|
||||||
|
reply := &structs.ConnectManageProxyResponse{
|
||||||
|
ProxyServiceID: proxy.Proxy.ProxyService.ID,
|
||||||
|
TargetServiceID: target.ID,
|
||||||
|
TargetServiceName: target.Service,
|
||||||
|
ContentHash: contentHash,
|
||||||
|
ExecMode: proxy.Proxy.ExecMode.String(),
|
||||||
|
Command: proxy.Proxy.Command,
|
||||||
|
Config: proxy.Proxy.Config,
|
||||||
|
}
|
||||||
|
return contentHash, reply, nil
|
||||||
|
})
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type agentLocalBlockingFunc func(updateCh chan struct{}) (string, interface{}, error)
|
||||||
|
|
||||||
|
func (s *HTTPServer) agentLocalBlockingQuery(hash string,
|
||||||
|
queryOpts *structs.QueryOptions, fn agentLocalBlockingFunc) (interface{}, error) {
|
||||||
|
|
||||||
|
var timer *time.Timer
|
||||||
|
|
||||||
|
if hash != "" {
|
||||||
|
// TODO(banks) at least define these defaults somewhere in a const. Would be
|
||||||
|
// nice not to duplicate the ones in consul/rpc.go too...
|
||||||
|
wait := queryOpts.MaxQueryTime
|
||||||
|
if wait == 0 {
|
||||||
|
wait = 5 * time.Minute
|
||||||
|
}
|
||||||
|
if wait > 10*time.Minute {
|
||||||
|
wait = 10 * time.Minute
|
||||||
|
}
|
||||||
|
// Apply a small amount of jitter to the request.
|
||||||
|
wait += lib.RandomStagger(wait / 16)
|
||||||
|
timer = time.NewTimer(wait)
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
|
||||||
|
for {
|
||||||
|
curHash, curResp, err := fn(ch)
|
||||||
|
if err != nil {
|
||||||
|
return curResp, err
|
||||||
|
}
|
||||||
|
// Hash was passed and matches current one, wait for update or timeout.
|
||||||
|
if timer != nil && hash == curHash {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
// Update happened, loop to fetch a new value
|
||||||
|
continue
|
||||||
|
case <-timer.C:
|
||||||
|
// Timeout, stop the watcher goroutine and return what we have
|
||||||
|
close(ch)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return curResp, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// AgentConnectAuthorize
|
// AgentConnectAuthorize
|
||||||
//
|
//
|
||||||
// POST /v1/agent/connect/authorize
|
// POST /v1/agent/connect/authorize
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/hashicorp/consul/testutil/retry"
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
|
"github.com/mitchellh/copystructure"
|
||||||
"github.com/pascaldekloe/goe/verify"
|
"github.com/pascaldekloe/goe/verify"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -1428,9 +1429,9 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) {
|
||||||
// Ensure proxy itself was registered
|
// Ensure proxy itself was registered
|
||||||
proxy := a.State.Proxy("web-proxy")
|
proxy := a.State.Proxy("web-proxy")
|
||||||
require.NotNil(proxy)
|
require.NotNil(proxy)
|
||||||
assert.Equal(structs.ProxyExecModeScript, proxy.ExecMode)
|
assert.Equal(structs.ProxyExecModeScript, proxy.Proxy.ExecMode)
|
||||||
assert.Equal("proxy.sh", proxy.Command)
|
assert.Equal("proxy.sh", proxy.Proxy.Command)
|
||||||
assert.Equal(args.Connect.Proxy.Config, proxy.Config)
|
assert.Equal(args.Connect.Proxy.Config, proxy.Proxy.Config)
|
||||||
|
|
||||||
// Ensure the token was configured
|
// Ensure the token was configured
|
||||||
assert.Equal("abc123", a.State.ServiceToken("web"))
|
assert.Equal("abc123", a.State.ServiceToken("web"))
|
||||||
|
@ -2200,6 +2201,167 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
|
||||||
// TODO(mitchellh): verify the private key matches the cert
|
// TODO(mitchellh): verify the private key matches the cert
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgentConnectProxy(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
// Define a local service with a managed proxy. It's registered in the test
|
||||||
|
// loop to make sure agent state is predictable whatever order tests execute
|
||||||
|
// since some alter this service config.
|
||||||
|
reg := &structs.ServiceDefinition{
|
||||||
|
Name: "test",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Port: 8000,
|
||||||
|
Check: structs.CheckType{
|
||||||
|
TTL: 15 * time.Second,
|
||||||
|
},
|
||||||
|
Connect: &structs.ServiceDefinitionConnect{
|
||||||
|
Proxy: &structs.ServiceDefinitionConnectProxy{
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"bind_port": 1234,
|
||||||
|
"connect_timeout_ms": 500,
|
||||||
|
"upstreams": []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"destination_name": "db",
|
||||||
|
"local_port": 3131,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedResponse := &structs.ConnectManageProxyResponse{
|
||||||
|
ProxyServiceID: "test-proxy",
|
||||||
|
TargetServiceID: "test",
|
||||||
|
TargetServiceName: "test",
|
||||||
|
ContentHash: "a15dccb216d38a6e",
|
||||||
|
ExecMode: "daemon",
|
||||||
|
Command: "",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"upstreams": []interface{}{
|
||||||
|
map[string]interface{}{
|
||||||
|
"destination_name": "db",
|
||||||
|
"local_port": float64(3131),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"bind_port": float64(1234),
|
||||||
|
"connect_timeout_ms": float64(500),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ur, err := copystructure.Copy(expectedResponse)
|
||||||
|
require.NoError(t, err)
|
||||||
|
updatedResponse := ur.(*structs.ConnectManageProxyResponse)
|
||||||
|
updatedResponse.ContentHash = "22bc9233a52c08fd"
|
||||||
|
upstreams := updatedResponse.Config["upstreams"].([]interface{})
|
||||||
|
upstreams = append(upstreams,
|
||||||
|
map[string]interface{}{
|
||||||
|
"destination_name": "cache",
|
||||||
|
"local_port": float64(4242),
|
||||||
|
})
|
||||||
|
updatedResponse.Config["upstreams"] = upstreams
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
url string
|
||||||
|
updateFunc func()
|
||||||
|
wantWait time.Duration
|
||||||
|
wantCode int
|
||||||
|
wantErr bool
|
||||||
|
wantResp *structs.ConnectManageProxyResponse
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple fetch",
|
||||||
|
url: "/v1/agent/connect/proxy/test-proxy",
|
||||||
|
wantCode: 200,
|
||||||
|
wantErr: false,
|
||||||
|
wantResp: expectedResponse,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "blocking fetch timeout, no change",
|
||||||
|
url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e&wait=100ms",
|
||||||
|
wantWait: 100 * time.Millisecond,
|
||||||
|
wantCode: 200,
|
||||||
|
wantErr: false,
|
||||||
|
wantResp: expectedResponse,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "blocking fetch old hash should return immediately",
|
||||||
|
url: "/v1/agent/connect/proxy/test-proxy?hash=123456789abcd&wait=10m",
|
||||||
|
wantCode: 200,
|
||||||
|
wantErr: false,
|
||||||
|
wantResp: expectedResponse,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "blocking fetch returns change",
|
||||||
|
url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e",
|
||||||
|
updateFunc: func() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
// Re-register with new proxy config
|
||||||
|
r2, err := copystructure.Copy(reg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
reg2 := r2.(*structs.ServiceDefinition)
|
||||||
|
reg2.Connect.Proxy.Config = updatedResponse.Config
|
||||||
|
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(r2))
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
_, err = a.srv.AgentRegisterService(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String())
|
||||||
|
},
|
||||||
|
wantWait: 100 * time.Millisecond,
|
||||||
|
wantCode: 200,
|
||||||
|
wantErr: false,
|
||||||
|
wantResp: updatedResponse,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
// Register the basic service to ensure it's in a known state to start.
|
||||||
|
{
|
||||||
|
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg))
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
_, err := a.srv.AgentRegisterService(resp, req)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(200, resp.Code, "body: %s", resp.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
req, _ := http.NewRequest("GET", tt.url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
if tt.updateFunc != nil {
|
||||||
|
go tt.updateFunc()
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
obj, err := a.srv.AgentConnectProxyConfig(resp, req)
|
||||||
|
elapsed := time.Now().Sub(start)
|
||||||
|
|
||||||
|
if tt.wantErr {
|
||||||
|
require.Error(err)
|
||||||
|
} else {
|
||||||
|
require.NoError(err)
|
||||||
|
}
|
||||||
|
if tt.wantCode != 0 {
|
||||||
|
require.Equal(tt.wantCode, resp.Code, "body: %s", resp.Body.String())
|
||||||
|
}
|
||||||
|
if tt.wantWait != 0 {
|
||||||
|
assert.True(elapsed >= tt.wantWait, "should have waited at least %s, "+
|
||||||
|
"took %s", tt.wantWait, elapsed)
|
||||||
|
} else {
|
||||||
|
assert.True(elapsed < 10*time.Millisecond, "should not have waited, "+
|
||||||
|
"took %s", elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(tt.wantResp, obj)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAgentConnectAuthorize_badBody(t *testing.T) {
|
func TestAgentConnectAuthorize_badBody(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
|
@ -125,6 +125,10 @@ type ManagedProxy struct {
|
||||||
// be exposed any other way. Unmanaged proxies will never see this and need to
|
// be exposed any other way. Unmanaged proxies will never see this and need to
|
||||||
// use service-scoped ACL tokens distributed externally.
|
// use service-scoped ACL tokens distributed externally.
|
||||||
ProxyToken string
|
ProxyToken string
|
||||||
|
|
||||||
|
// WatchCh is a close-only chan that is closed when the proxy is removed or
|
||||||
|
// updated.
|
||||||
|
WatchCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// State is used to represent the node's services,
|
// State is used to represent the node's services,
|
||||||
|
@ -171,7 +175,7 @@ type State struct {
|
||||||
// tokens contains the ACL tokens
|
// tokens contains the ACL tokens
|
||||||
tokens *token.Store
|
tokens *token.Store
|
||||||
|
|
||||||
// managedProxies is a map of all manged connect proxies registered locally on
|
// managedProxies is a map of all managed connect proxies registered locally on
|
||||||
// this agent. This is NOT kept in sync with servers since it's agent-local
|
// this agent. This is NOT kept in sync with servers since it's agent-local
|
||||||
// config only. Proxy instances have separate service registrations in the
|
// config only. Proxy instances have separate service registrations in the
|
||||||
// services map above which are kept in sync via anti-entropy. Un-managed
|
// services map above which are kept in sync via anti-entropy. Un-managed
|
||||||
|
@ -633,9 +637,17 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
|
||||||
proxy.ProxyService = svc
|
proxy.ProxyService = svc
|
||||||
|
|
||||||
// All set, add the proxy and return the service
|
// All set, add the proxy and return the service
|
||||||
|
if old, ok := l.managedProxies[svc.ID]; ok {
|
||||||
|
// Notify watchers of the existing proxy config that it's changing. Note
|
||||||
|
// this is safe here even before the map is updated since we still hold the
|
||||||
|
// state lock and the watcher can't re-read the new config until we return
|
||||||
|
// anyway.
|
||||||
|
close(old.WatchCh)
|
||||||
|
}
|
||||||
l.managedProxies[svc.ID] = &ManagedProxy{
|
l.managedProxies[svc.ID] = &ManagedProxy{
|
||||||
Proxy: proxy,
|
Proxy: proxy,
|
||||||
ProxyToken: pToken,
|
ProxyToken: pToken,
|
||||||
|
WatchCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// No need to trigger sync as proxy state is local only.
|
// No need to trigger sync as proxy state is local only.
|
||||||
|
@ -653,49 +665,32 @@ func (l *State) RemoveProxy(id string) error {
|
||||||
}
|
}
|
||||||
delete(l.managedProxies, id)
|
delete(l.managedProxies, id)
|
||||||
|
|
||||||
|
// Notify watchers of the existing proxy config that it's changed.
|
||||||
|
close(p.WatchCh)
|
||||||
|
|
||||||
// No need to trigger sync as proxy state is local only.
|
// No need to trigger sync as proxy state is local only.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxy returns the local proxy state.
|
// Proxy returns the local proxy state.
|
||||||
func (l *State) Proxy(id string) *structs.ConnectManagedProxy {
|
func (l *State) Proxy(id string) *ManagedProxy {
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
|
return l.managedProxies[id]
|
||||||
p := l.managedProxies[id]
|
|
||||||
if p == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return p.Proxy
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxies returns the locally registered proxies.
|
// Proxies returns the locally registered proxies.
|
||||||
func (l *State) Proxies() map[string]*structs.ConnectManagedProxy {
|
func (l *State) Proxies() map[string]*ManagedProxy {
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
|
|
||||||
m := make(map[string]*structs.ConnectManagedProxy)
|
m := make(map[string]*ManagedProxy)
|
||||||
for id, p := range l.managedProxies {
|
for id, p := range l.managedProxies {
|
||||||
m[id] = p.Proxy
|
m[id] = p
|
||||||
}
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProxyToken returns the local proxy token for a given proxy. Note this is not
|
|
||||||
// an ACL token so it won't fallback to using the agent-configured default ACL
|
|
||||||
// token. If the proxy doesn't exist an error is returned, otherwise the token
|
|
||||||
// is guaranteed to exist.
|
|
||||||
func (l *State) ProxyToken(id string) (string, error) {
|
|
||||||
l.RLock()
|
|
||||||
defer l.RUnlock()
|
|
||||||
|
|
||||||
p := l.managedProxies[id]
|
|
||||||
if p == nil {
|
|
||||||
return "", fmt.Errorf("proxy %s not registered", id)
|
|
||||||
}
|
|
||||||
return p.ProxyToken, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Metadata returns the local node metadata fields that the
|
// Metadata returns the local node metadata fields that the
|
||||||
// agent is aware of and are being kept in sync with the server
|
// agent is aware of and are being kept in sync with the server
|
||||||
func (l *State) Metadata() map[string]string {
|
func (l *State) Metadata() map[string]string {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -1673,8 +1674,8 @@ func TestStateProxyManagement(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
state := local.NewState(local.Config{
|
state := local.NewState(local.Config{
|
||||||
ProxyPortRangeStart: 20000,
|
ProxyBindMinPort: 20000,
|
||||||
ProxyPortRangeEnd: 20002,
|
ProxyBindMaxPort: 20001,
|
||||||
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
||||||
|
|
||||||
// Stub state syncing
|
// Stub state syncing
|
||||||
|
@ -1707,6 +1708,20 @@ func TestStateProxyManagement(t *testing.T) {
|
||||||
}, "fake-token-db")
|
}, "fake-token-db")
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Record initial local modify index
|
||||||
|
lastModifyIndex := state.LocalModifyIndex()
|
||||||
|
assertModIndexUpdate := func(id string) {
|
||||||
|
t.Helper()
|
||||||
|
nowIndex := state.LocalModifyIndex()
|
||||||
|
assert.True(lastModifyIndex < nowIndex)
|
||||||
|
if id != "" {
|
||||||
|
p := state.Proxy(id)
|
||||||
|
require.NotNil(p)
|
||||||
|
assert.True(lastModifyIndex < p.ModifyIndex)
|
||||||
|
}
|
||||||
|
lastModifyIndex = nowIndex
|
||||||
|
}
|
||||||
|
|
||||||
// Should work now
|
// Should work now
|
||||||
svc, err := state.AddProxy(&p1, "fake-token")
|
svc, err := state.AddProxy(&p1, "fake-token")
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
@ -1718,6 +1733,7 @@ func TestStateProxyManagement(t *testing.T) {
|
||||||
assert.Equal("", svc.Address, "should have empty address by default")
|
assert.Equal("", svc.Address, "should have empty address by default")
|
||||||
// Port is non-deterministic but could be either of 20000 or 20001
|
// Port is non-deterministic but could be either of 20000 or 20001
|
||||||
assert.Contains([]int{20000, 20001}, svc.Port)
|
assert.Contains([]int{20000, 20001}, svc.Port)
|
||||||
|
assertModIndexUpdate(svc.ID)
|
||||||
|
|
||||||
// Second proxy should claim other port
|
// Second proxy should claim other port
|
||||||
p2 := p1
|
p2 := p1
|
||||||
|
@ -1726,10 +1742,10 @@ func TestStateProxyManagement(t *testing.T) {
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
assert.Contains([]int{20000, 20001}, svc2.Port)
|
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||||
assert.NotEqual(svc.Port, svc2.Port)
|
assert.NotEqual(svc.Port, svc2.Port)
|
||||||
|
assertModIndexUpdate(svc2.ID)
|
||||||
|
|
||||||
// Just saving this for later...
|
// Store this for later
|
||||||
p2Token, err := state.ProxyToken(svc2.ID)
|
p2token := state.Proxy(svc2.ID).ProxyToken
|
||||||
require.NoError(err)
|
|
||||||
|
|
||||||
// Third proxy should fail as all ports are used
|
// Third proxy should fail as all ports are used
|
||||||
p3 := p1
|
p3 := p1
|
||||||
|
@ -1746,6 +1762,32 @@ func TestStateProxyManagement(t *testing.T) {
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal("0.0.0.0", svc3.Address)
|
require.Equal("0.0.0.0", svc3.Address)
|
||||||
require.Equal(1234, svc3.Port)
|
require.Equal(1234, svc3.Port)
|
||||||
|
assertModIndexUpdate(svc3.ID)
|
||||||
|
|
||||||
|
// Update config of an already registered proxy should work
|
||||||
|
p3updated := p3
|
||||||
|
p3updated.Config["foo"] = "bar"
|
||||||
|
// Setup multiple watchers who should all witness the change
|
||||||
|
gotP3 := state.Proxy(svc3.ID)
|
||||||
|
require.NotNil(gotP3)
|
||||||
|
var watchWg sync.WaitGroup
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
watchWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
<-gotP3.WatchCh
|
||||||
|
watchWg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
svc3, err = state.AddProxy(&p3updated, "fake-token")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal("0.0.0.0", svc3.Address)
|
||||||
|
require.Equal(1234, svc3.Port)
|
||||||
|
gotProxy3 := state.Proxy(svc3.ID)
|
||||||
|
require.NotNil(gotProxy3)
|
||||||
|
require.Equal(p3updated.Config, gotProxy3.Proxy.Config)
|
||||||
|
assertModIndexUpdate(svc3.ID) // update must change mod index
|
||||||
|
// All watchers should have fired so this should not hang the test!
|
||||||
|
watchWg.Wait()
|
||||||
|
|
||||||
// Remove one of the auto-assigned proxies
|
// Remove one of the auto-assigned proxies
|
||||||
err = state.RemoveProxy(svc2.ID)
|
err = state.RemoveProxy(svc2.ID)
|
||||||
|
@ -1758,31 +1800,29 @@ func TestStateProxyManagement(t *testing.T) {
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
assert.Contains([]int{20000, 20001}, svc2.Port)
|
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||||
assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed")
|
assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed")
|
||||||
|
assertModIndexUpdate(svc4.ID)
|
||||||
|
|
||||||
// Remove a proxy that doesn't exist should error
|
// Remove a proxy that doesn't exist should error
|
||||||
err = state.RemoveProxy("nope")
|
err = state.RemoveProxy("nope")
|
||||||
require.Error(err)
|
require.Error(err)
|
||||||
|
|
||||||
assert.Equal(&p4, state.Proxy(p4.ProxyService.ID),
|
assert.Equal(&p4, state.Proxy(p4.ProxyService.ID).Proxy,
|
||||||
"should fetch the right proxy details")
|
"should fetch the right proxy details")
|
||||||
assert.Nil(state.Proxy("nope"))
|
assert.Nil(state.Proxy("nope"))
|
||||||
|
|
||||||
proxies := state.Proxies()
|
proxies := state.Proxies()
|
||||||
assert.Len(proxies, 3)
|
assert.Len(proxies, 3)
|
||||||
assert.Equal(&p1, proxies[svc.ID])
|
assert.Equal(&p1, proxies[svc.ID].Proxy)
|
||||||
assert.Equal(&p4, proxies[svc4.ID])
|
assert.Equal(&p4, proxies[svc4.ID].Proxy)
|
||||||
assert.Equal(&p3, proxies[svc3.ID])
|
assert.Equal(&p3, proxies[svc3.ID].Proxy)
|
||||||
|
|
||||||
tokens := make([]string, 4)
|
tokens := make([]string, 4)
|
||||||
tokens[0], err = state.ProxyToken(svc.ID)
|
tokens[0] = state.Proxy(svc.ID).ProxyToken
|
||||||
require.NoError(err)
|
|
||||||
// p2 not registered anymore but lets make sure p4 got a new token when it
|
// p2 not registered anymore but lets make sure p4 got a new token when it
|
||||||
// re-registered with same ID.
|
// re-registered with same ID.
|
||||||
tokens[1] = p2Token
|
tokens[1] = p2token
|
||||||
tokens[2], err = state.ProxyToken(svc3.ID)
|
tokens[2] = state.Proxy(svc2.ID).ProxyToken
|
||||||
require.NoError(err)
|
tokens[3] = state.Proxy(svc3.ID).ProxyToken
|
||||||
tokens[3], err = state.ProxyToken(svc4.ID)
|
|
||||||
require.NoError(err)
|
|
||||||
|
|
||||||
// Quick check all are distinct
|
// Quick check all are distinct
|
||||||
for i := 0; i < len(tokens)-1; i++ {
|
for i := 0; i < len(tokens)-1; i++ {
|
||||||
|
|
|
@ -32,6 +32,18 @@ const (
|
||||||
ProxyExecModeScript
|
ProxyExecModeScript
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// String implements Stringer
|
||||||
|
func (m ProxyExecMode) String() string {
|
||||||
|
switch m {
|
||||||
|
case ProxyExecModeDaemon:
|
||||||
|
return "daemon"
|
||||||
|
case ProxyExecModeScript:
|
||||||
|
return "script"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ConnectManagedProxy represents the agent-local state for a configured proxy
|
// ConnectManagedProxy represents the agent-local state for a configured proxy
|
||||||
// instance. This is never stored or sent to the servers and is only used to
|
// instance. This is never stored or sent to the servers and is only used to
|
||||||
// store the config for the proxy that the agent needs to track. For now it's
|
// store the config for the proxy that the agent needs to track. For now it's
|
||||||
|
@ -91,3 +103,16 @@ func (p *ConnectManagedProxy) ParseConfig() (*ConnectManagedProxyConfig, error)
|
||||||
}
|
}
|
||||||
return &cfg, nil
|
return &cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectManageProxyResponse is the public response object we return for
|
||||||
|
// queries on local proxy config state. It's similar to ConnectManagedProxy but
|
||||||
|
// with some fields re-arranged.
|
||||||
|
type ConnectManageProxyResponse struct {
|
||||||
|
ProxyServiceID string
|
||||||
|
TargetServiceID string
|
||||||
|
TargetServiceName string
|
||||||
|
ContentHash string
|
||||||
|
ExecMode string
|
||||||
|
Command string
|
||||||
|
Config map[string]interface{}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue