mirror of
https://github.com/status-im/consul.git
synced 2025-01-24 04:31:12 +00:00
e812f5516a
- A new endpoint `/v1/agent/service/:service_id` which is a generic way to look up the service for a single instance. The primary value here is that it: - **supports hash-based blocking** and so; - **replaces `/agent/connect/proxy/:proxy_id`** as the mechanism the built-in proxy uses to read its config. - It's not proxy specific and so works for any service. - It has a temporary shim to call through to the existing endpoint to preserve current managed proxy config defaulting behaviour until that is removed entirely (tested). - The built-in proxy now uses the new endpoint exclusively for it's config - The built-in proxy now has a `-sidecar-for` flag that allows the service ID of the _target_ service to be specified, on the condition that there is exactly one "sidecar" proxy (that is one that has `Proxy.DestinationServiceID` set) for the service registered. - Several fixes for edge cases for SidecarService - A fix for `Alias` checks - when running locally they didn't update their state until some external thing updated the target. If the target service has no checks registered as below, then the alias never made it past critical.
281 lines
6.8 KiB
Go
281 lines
6.8 KiB
Go
package proxy
|
|
|
|
import (
|
|
"log"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/hashicorp/consul/agent"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/connect"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestUpstreamResolverFuncFromClient(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
tests := []struct {
|
|
name string
|
|
cfg UpstreamConfig
|
|
want *connect.ConsulResolver
|
|
}{
|
|
{
|
|
name: "service",
|
|
cfg: UpstreamConfig{
|
|
DestinationNamespace: "foo",
|
|
DestinationName: "web",
|
|
Datacenter: "ny1",
|
|
DestinationType: "service",
|
|
},
|
|
want: &connect.ConsulResolver{
|
|
Namespace: "foo",
|
|
Name: "web",
|
|
Datacenter: "ny1",
|
|
Type: connect.ConsulResolverTypeService,
|
|
},
|
|
},
|
|
{
|
|
name: "prepared_query",
|
|
cfg: UpstreamConfig{
|
|
DestinationNamespace: "foo",
|
|
DestinationName: "web",
|
|
Datacenter: "ny1",
|
|
DestinationType: "prepared_query",
|
|
},
|
|
want: &connect.ConsulResolver{
|
|
Namespace: "foo",
|
|
Name: "web",
|
|
Datacenter: "ny1",
|
|
Type: connect.ConsulResolverTypePreparedQuery,
|
|
},
|
|
},
|
|
{
|
|
name: "unknown behaves like service",
|
|
cfg: UpstreamConfig{
|
|
DestinationNamespace: "foo",
|
|
DestinationName: "web",
|
|
Datacenter: "ny1",
|
|
DestinationType: "junk",
|
|
},
|
|
want: &connect.ConsulResolver{
|
|
Namespace: "foo",
|
|
Name: "web",
|
|
Datacenter: "ny1",
|
|
Type: connect.ConsulResolverTypeService,
|
|
},
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
// Client doesn't really matter as long as it's passed through.
|
|
gotFn := UpstreamResolverFuncFromClient(nil)
|
|
got, err := gotFn(tt.cfg)
|
|
require.NoError(t, err)
|
|
require.Equal(t, tt.want, got)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAgentConfigWatcherManagedProxy(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
a := agent.NewTestAgent("agent_smith", `
|
|
connect {
|
|
enabled = true
|
|
proxy {
|
|
allow_managed_api_registration = true
|
|
}
|
|
}
|
|
`)
|
|
defer a.Shutdown()
|
|
|
|
client := a.Client()
|
|
agent := client.Agent()
|
|
|
|
// Register a local agent service with a managed proxy
|
|
reg := &api.AgentServiceRegistration{
|
|
Name: "web",
|
|
Port: 8080,
|
|
Connect: &api.AgentServiceConnect{
|
|
Proxy: &api.AgentServiceConnectProxy{
|
|
Config: map[string]interface{}{
|
|
"bind_address": "10.10.10.10",
|
|
"bind_port": 1010,
|
|
"local_service_address": "127.0.0.1:5000",
|
|
"handshake_timeout_ms": 999,
|
|
},
|
|
Upstreams: []api.Upstream{
|
|
{
|
|
DestinationName: "db",
|
|
LocalBindPort: 9191,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
err := agent.ServiceRegister(reg)
|
|
require.NoError(t, err)
|
|
|
|
w, err := NewAgentConfigWatcher(client, "web-proxy",
|
|
log.New(os.Stderr, "", log.LstdFlags))
|
|
require.NoError(t, err)
|
|
|
|
cfg := testGetConfigValTimeout(t, w, 500*time.Millisecond)
|
|
|
|
expectCfg := &Config{
|
|
ProxiedServiceName: "web",
|
|
ProxiedServiceNamespace: "default",
|
|
PublicListener: PublicListenerConfig{
|
|
BindAddress: "10.10.10.10",
|
|
BindPort: 1010,
|
|
LocalServiceAddress: "127.0.0.1:5000",
|
|
HandshakeTimeoutMs: 999,
|
|
LocalConnectTimeoutMs: 1000, // from applyDefaults
|
|
},
|
|
Upstreams: []UpstreamConfig{
|
|
{
|
|
DestinationName: "db",
|
|
DestinationNamespace: "default",
|
|
DestinationType: "service",
|
|
LocalBindPort: 9191,
|
|
LocalBindAddress: "127.0.0.1",
|
|
},
|
|
},
|
|
}
|
|
|
|
assert.Equal(t, expectCfg, cfg)
|
|
|
|
// Now keep watching and update the config.
|
|
go func() {
|
|
// Wait for watcher to be watching
|
|
time.Sleep(20 * time.Millisecond)
|
|
reg.Connect.Proxy.Upstreams = append(reg.Connect.Proxy.Upstreams,
|
|
api.Upstream{
|
|
DestinationName: "cache",
|
|
LocalBindPort: 9292,
|
|
LocalBindAddress: "127.10.10.10",
|
|
})
|
|
reg.Connect.Proxy.Config["local_connect_timeout_ms"] = 444
|
|
err := agent.ServiceRegister(reg)
|
|
require.NoError(t, err)
|
|
}()
|
|
|
|
cfg = testGetConfigValTimeout(t, w, 2*time.Second)
|
|
|
|
expectCfg.Upstreams = append(expectCfg.Upstreams, UpstreamConfig{
|
|
DestinationName: "cache",
|
|
DestinationNamespace: "default",
|
|
DestinationType: "service",
|
|
LocalBindPort: 9292,
|
|
LocalBindAddress: "127.10.10.10",
|
|
})
|
|
expectCfg.PublicListener.LocalConnectTimeoutMs = 444
|
|
|
|
assert.Equal(t, expectCfg, cfg)
|
|
}
|
|
|
|
func TestAgentConfigWatcherSidecarProxy(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
a := agent.NewTestAgent("agent_smith", ``)
|
|
defer a.Shutdown()
|
|
|
|
client := a.Client()
|
|
agent := client.Agent()
|
|
|
|
// Register a local agent service with a managed proxy
|
|
reg := &api.AgentServiceRegistration{
|
|
Name: "web",
|
|
Port: 8080,
|
|
Connect: &api.AgentServiceConnect{
|
|
SidecarService: &api.AgentServiceRegistration{
|
|
Proxy: &api.AgentServiceConnectProxyConfig{
|
|
Config: map[string]interface{}{
|
|
"handshake_timeout_ms": 999,
|
|
},
|
|
Upstreams: []api.Upstream{
|
|
{
|
|
DestinationName: "db",
|
|
LocalBindPort: 9191,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
err := agent.ServiceRegister(reg)
|
|
require.NoError(t, err)
|
|
|
|
w, err := NewAgentConfigWatcher(client, "web-sidecar-proxy",
|
|
log.New(os.Stderr, "", log.LstdFlags))
|
|
require.NoError(t, err)
|
|
|
|
cfg := testGetConfigValTimeout(t, w, 500*time.Millisecond)
|
|
|
|
expectCfg := &Config{
|
|
ProxiedServiceName: "web",
|
|
ProxiedServiceNamespace: "default",
|
|
PublicListener: PublicListenerConfig{
|
|
BindAddress: "0.0.0.0",
|
|
BindPort: 21000,
|
|
LocalServiceAddress: "127.0.0.1:8080",
|
|
HandshakeTimeoutMs: 999,
|
|
LocalConnectTimeoutMs: 1000, // from applyDefaults
|
|
},
|
|
Upstreams: []UpstreamConfig{
|
|
{
|
|
DestinationName: "db",
|
|
DestinationNamespace: "default",
|
|
DestinationType: "service",
|
|
LocalBindPort: 9191,
|
|
LocalBindAddress: "127.0.0.1",
|
|
},
|
|
},
|
|
}
|
|
|
|
require.Equal(t, expectCfg, cfg)
|
|
|
|
// Now keep watching and update the config.
|
|
go func() {
|
|
// Wait for watcher to be watching
|
|
time.Sleep(20 * time.Millisecond)
|
|
reg.Connect.SidecarService.Proxy.Upstreams = append(reg.Connect.SidecarService.Proxy.Upstreams,
|
|
api.Upstream{
|
|
DestinationName: "cache",
|
|
LocalBindPort: 9292,
|
|
LocalBindAddress: "127.10.10.10",
|
|
})
|
|
reg.Connect.SidecarService.Proxy.Config["local_connect_timeout_ms"] = 444
|
|
err := agent.ServiceRegister(reg)
|
|
require.NoError(t, err)
|
|
}()
|
|
|
|
cfg = testGetConfigValTimeout(t, w, 2*time.Second)
|
|
|
|
expectCfg.Upstreams = append(expectCfg.Upstreams, UpstreamConfig{
|
|
DestinationName: "cache",
|
|
DestinationNamespace: "default",
|
|
DestinationType: "service",
|
|
LocalBindPort: 9292,
|
|
LocalBindAddress: "127.10.10.10",
|
|
})
|
|
expectCfg.PublicListener.LocalConnectTimeoutMs = 444
|
|
|
|
assert.Equal(t, expectCfg, cfg)
|
|
}
|
|
|
|
func testGetConfigValTimeout(t *testing.T, w ConfigWatcher,
|
|
timeout time.Duration) *Config {
|
|
t.Helper()
|
|
select {
|
|
case cfg := <-w.Watch():
|
|
return cfg
|
|
case <-time.After(timeout):
|
|
t.Fatalf("timeout after %s waiting for config update", timeout)
|
|
return nil
|
|
}
|
|
}
|