consul/watch/funcs_test.go
Paul Banks e812f5516a Add -sidecar-for and new /agent/service/:service_id endpoint (#4691)
- A new endpoint `/v1/agent/service/:service_id` which is a generic way to look up the service for a single instance. The primary value here is that it:
   - **supports hash-based blocking** and so;
   - **replaces `/agent/connect/proxy/:proxy_id`** as the mechanism the built-in proxy uses to read its config.
   - It's not proxy specific and so works for any service.
   - It has a temporary shim to call through to the existing endpoint to preserve current managed proxy config defaulting behaviour until that is removed entirely (tested).
 - The built-in proxy now uses the new endpoint exclusively for it's config
 - The built-in proxy now has a `-sidecar-for` flag that allows the service ID of the _target_ service to be specified, on the condition that there is exactly one "sidecar" proxy (that is one that has `Proxy.DestinationServiceID` set) for the service registered.
 - Several fixes for edge cases for SidecarService
 - A fix for `Alias` checks - when running locally they didn't update their state until some external thing updated the target. If the target service has no checks registered as below, then the alias never made it past critical.
2018-10-10 16:55:34 +01:00

808 lines
16 KiB
Go

package watch_test
import (
"encoding/json"
"errors"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/connect"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
"github.com/stretchr/testify/require"
)
var errBadContent = errors.New("bad content")
var errTimeout = errors.New("timeout")
var timeout = 5 * time.Second
func makeInvokeCh() chan error {
ch := make(chan error)
time.AfterFunc(timeout, func() { ch <- errTimeout })
return ch
}
func TestKeyWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*consulapi.KVPair)
if !ok || v == nil {
return // ignore
}
if string(v.Value) != "test" {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
kv := a.Client().KV()
time.Sleep(20 * time.Millisecond)
pair := &consulapi.KVPair{
Key: "foo/bar/baz",
Value: []byte("test"),
}
if _, err := kv.Put(pair, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestKeyWatch_With_PrefixDelete(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*consulapi.KVPair)
if !ok || v == nil {
return // ignore
}
if string(v.Value) != "test" {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
kv := a.Client().KV()
time.Sleep(20 * time.Millisecond)
pair := &consulapi.KVPair{
Key: "foo/bar/baz",
Value: []byte("test"),
}
if _, err := kv.Put(pair, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestKeyPrefixWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(consulapi.KVPairs)
if !ok || len(v) == 0 {
return
}
if string(v[0].Key) != "foo/bar" {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
kv := a.Client().KV()
time.Sleep(20 * time.Millisecond)
pair := &consulapi.KVPair{
Key: "foo/bar",
}
if _, err := kv.Put(pair, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestServicesWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"services"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(map[string][]string)
if !ok || len(v) == 0 {
return // ignore
}
if v["consul"] == nil {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
agent := a.Client().Agent()
time.Sleep(20 * time.Millisecond)
reg := &consulapi.AgentServiceRegistration{
ID: "foo",
Name: "foo",
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestNodesWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"nodes"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*consulapi.Node)
if !ok || len(v) == 0 {
return // ignore
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
catalog := a.Client().Catalog()
time.Sleep(20 * time.Millisecond)
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestServiceWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*consulapi.ServiceEntry)
if !ok || len(v) == 0 {
return // ignore
}
if v[0].Service.ID != "foo" {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
agent := a.Client().Agent()
time.Sleep(20 * time.Millisecond)
reg := &consulapi.AgentServiceRegistration{
ID: "foo",
Name: "foo",
Tags: []string{"bar"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestChecksWatch_State(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"checks", "state":"warning"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*consulapi.HealthCheck)
if !ok || len(v) == 0 {
return // ignore
}
if v[0].CheckID != "foobar" || v[0].Status != "warning" {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
catalog := a.Client().Catalog()
time.Sleep(20 * time.Millisecond)
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Check: &consulapi.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: consulapi.HealthWarning,
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestChecksWatch_Service(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"checks", "service":"foobar"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*consulapi.HealthCheck)
if !ok || len(v) == 0 {
return // ignore
}
if v[0].CheckID != "foobar" {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
catalog := a.Client().Catalog()
time.Sleep(20 * time.Millisecond)
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &consulapi.AgentService{
ID: "foobar",
Service: "foobar",
},
Check: &consulapi.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: consulapi.HealthPassing,
ServiceID: "foobar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestEventWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"event", "name": "foo"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.UserEvent)
if !ok || len(v) == 0 {
return // ignore
}
if string(v[len(v)-1].Name) != "foo" {
invoke <- errBadContent
return
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
event := a.Client().Event()
time.Sleep(20 * time.Millisecond)
params := &consulapi.UserEvent{Name: "foo"}
if _, _, err := event.Fire(params, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestConnectRootsWatch(t *testing.T) {
t.Parallel()
// NewTestAgent will bootstrap a new CA
a := agent.NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
var originalCAID string
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"connect_roots"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*consulapi.CARootList)
if !ok || v == nil {
return // ignore
}
// Only 1 CA is the bootstrapped state (i.e. first response). Ignore this
// state and wait for the new CA to show up too.
if len(v.Roots) == 1 {
originalCAID = v.ActiveRootID
return
}
assert.NotEmpty(t, originalCAID)
assert.NotEqual(t, originalCAID, v.ActiveRootID)
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
// Set a new CA
connect.TestCAConfigSet(t, a, nil)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestConnectLeafWatch(t *testing.T) {
t.Parallel()
// NewTestAgent will bootstrap a new CA
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a web service to get certs for
{
agent := a.Client().Agent()
reg := consulapi.AgentServiceRegistration{
ID: "web",
Name: "web",
Port: 9090,
}
err := agent.ServiceRegister(&reg)
require.Nil(t, err)
}
var lastCert *consulapi.LeafCert
//invoke := makeInvokeCh()
invoke := make(chan error)
plan := mustParse(t, `{"type":"connect_leaf", "service":"web"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*consulapi.LeafCert)
if !ok || v == nil {
return // ignore
}
if lastCert == nil {
// Initial fetch, just store the cert and return
lastCert = v
return
}
// TODO(banks): right now the root rotation actually causes Serial numbers
// to reset so these end up all being the same. That needs fixing but it's
// a bigger task than I want to bite off for this PR.
//assert.NotEqual(t, lastCert.SerialNumber, v.SerialNumber)
assert.NotEqual(t, lastCert.CertPEM, v.CertPEM)
assert.NotEqual(t, lastCert.PrivateKeyPEM, v.PrivateKeyPEM)
assert.NotEqual(t, lastCert.ModifyIndex, v.ModifyIndex)
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
// Change the CA to trigger a leaf change
connect.TestCAConfigSet(t, a, nil)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestConnectProxyConfigWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), `
connect {
enabled = true
proxy {
allow_managed_api_registration = true
}
}
`)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a local agent service with a managed proxy
reg := &consulapi.AgentServiceRegistration{
Name: "web",
Port: 8080,
Connect: &consulapi.AgentServiceConnect{
Proxy: &consulapi.AgentServiceConnectProxy{
Config: map[string]interface{}{
"foo": "bar",
},
},
},
}
client := a.Client()
agent := client.Agent()
err := agent.ServiceRegister(reg)
require.NoError(t, err)
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"connect_proxy_config", "proxy_service_id":"web-proxy"}`)
plan.HybridHandler = func(blockParamVal watch.BlockingParamVal, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*consulapi.ConnectProxyConfig)
if !ok || v == nil {
return // ignore
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
// Change the proxy's config
reg.Connect.Proxy.Config["foo"] = "buzz"
reg.Connect.Proxy.Config["baz"] = "qux"
err := agent.ServiceRegister(reg)
require.NoError(t, err)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func TestAgentServiceWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a local agent service
reg := &consulapi.AgentServiceRegistration{
Name: "web",
Port: 8080,
}
client := a.Client()
agent := client.Agent()
err := agent.ServiceRegister(reg)
require.NoError(t, err)
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"agent_service", "service_id":"web"}`)
plan.HybridHandler = func(blockParamVal watch.BlockingParamVal, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*consulapi.AgentService)
if !ok || v == nil {
return // ignore
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
// Change the service definition
reg.Port = 9090
err := agent.ServiceRegister(reg)
require.NoError(t, err)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func mustParse(t *testing.T, q string) *watch.Plan {
t.Helper()
var params map[string]interface{}
if err := json.Unmarshal([]byte(q), &params); err != nil {
t.Fatal(err)
}
plan, err := watch.Parse(params)
if err != nil {
t.Fatalf("err: %v", err)
}
return plan
}