From a7c78e637de013e96c2369eb5eb3828a146414e2 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 11:59:44 +0200 Subject: [PATCH 01/15] watch: use test agent instead of external Consul instance --- watch/funcs_test.go | 114 ++++++++++++++++++++++++-------------------- 1 file changed, 61 insertions(+), 53 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index c048a0256f..90a4a948cc 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -1,23 +1,19 @@ -package watch +package watch_test import ( - "os" + "encoding/json" "testing" "time" + "github.com/hashicorp/consul/agent" consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/watch" ) -var consulAddr string - -func init() { - consulAddr = os.Getenv("CONSUL_ADDR") -} - func TestKeyWatch(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -37,7 +33,7 @@ func TestKeyWatch(t *testing.T) { defer plan.Stop() time.Sleep(20 * time.Millisecond) - kv := plan.client.KV() + kv := a.Client().KV() pair := &consulapi.KVPair{ Key: "foo/bar/baz", Value: []byte("test"), @@ -58,7 +54,7 @@ func TestKeyWatch(t *testing.T) { } }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -69,9 +65,9 @@ func TestKeyWatch(t *testing.T) { } func TestKeyWatch_With_PrefixDelete(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) invoke := 0 deletedKeyWatchInvoked := 0 @@ -93,7 +89,7 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { defer plan.Stop() time.Sleep(20 * time.Millisecond) - kv := plan.client.KV() + kv := a.Client().KV() pair := &consulapi.KVPair{ Key: "foo/bar/baz", Value: []byte("test"), @@ -115,7 +111,7 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { plan.Stop() }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -129,9 +125,9 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { } func TestKeyPrefixWatch(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -154,7 +150,7 @@ func TestKeyPrefixWatch(t *testing.T) { defer plan.Stop() time.Sleep(20 * time.Millisecond) - kv := plan.client.KV() + kv := a.Client().KV() pair := &consulapi.KVPair{ Key: "foo/bar", } @@ -174,7 +170,7 @@ func TestKeyPrefixWatch(t *testing.T) { } }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -185,9 +181,9 @@ func TestKeyPrefixWatch(t *testing.T) { } func TestServicesWatch(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"services"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -207,7 +203,7 @@ func TestServicesWatch(t *testing.T) { time.Sleep(20 * time.Millisecond) plan.Stop() - agent := plan.client.Agent() + agent := a.Client().Agent() reg := &consulapi.AgentServiceRegistration{ ID: "foo", Name: "foo", @@ -217,7 +213,7 @@ func TestServicesWatch(t *testing.T) { agent.ServiceDeregister("foo") }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -228,9 +224,9 @@ func TestServicesWatch(t *testing.T) { } func TestNodesWatch(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"nodes"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -250,7 +246,7 @@ func TestNodesWatch(t *testing.T) { time.Sleep(20 * time.Millisecond) plan.Stop() - catalog := plan.client.Catalog() + catalog := a.Client().Catalog() reg := &consulapi.CatalogRegistration{ Node: "foobar", Address: "1.1.1.1", @@ -266,7 +262,7 @@ func TestNodesWatch(t *testing.T) { catalog.Deregister(dereg, nil) }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -277,9 +273,9 @@ func TestNodesWatch(t *testing.T) { } func TestServiceWatch(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -301,7 +297,7 @@ func TestServiceWatch(t *testing.T) { go func() { time.Sleep(20 * time.Millisecond) - agent := plan.client.Agent() + agent := a.Client().Agent() reg := &consulapi.AgentServiceRegistration{ ID: "foo", Name: "foo", @@ -315,7 +311,7 @@ func TestServiceWatch(t *testing.T) { agent.ServiceDeregister("foo") }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -326,9 +322,9 @@ func TestServiceWatch(t *testing.T) { } func TestChecksWatch_State(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"checks", "state":"warning"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -350,7 +346,7 @@ func TestChecksWatch_State(t *testing.T) { go func() { time.Sleep(20 * time.Millisecond) - catalog := plan.client.Catalog() + catalog := a.Client().Catalog() reg := &consulapi.CatalogRegistration{ Node: "foobar", Address: "1.1.1.1", @@ -375,7 +371,7 @@ func TestChecksWatch_State(t *testing.T) { catalog.Deregister(dereg, nil) }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -386,9 +382,9 @@ func TestChecksWatch_State(t *testing.T) { } func TestChecksWatch_Service(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"checks", "service":"foobar"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -410,7 +406,7 @@ func TestChecksWatch_Service(t *testing.T) { go func() { time.Sleep(20 * time.Millisecond) - catalog := plan.client.Catalog() + catalog := a.Client().Catalog() reg := &consulapi.CatalogRegistration{ Node: "foobar", Address: "1.1.1.1", @@ -443,7 +439,7 @@ func TestChecksWatch_Service(t *testing.T) { catalog.Deregister(dereg, nil) }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -454,9 +450,9 @@ func TestChecksWatch_Service(t *testing.T) { } func TestEventWatch(t *testing.T) { - if consulAddr == "" { - t.Skip() - } + a := agent.NewTestAgent(t.Name(), ``) + defer a.Shutdown() + plan := mustParse(t, `{"type":"event", "name": "foo"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { @@ -476,14 +472,14 @@ func TestEventWatch(t *testing.T) { defer plan.Stop() time.Sleep(20 * time.Millisecond) - event := plan.client.Event() + event := a.Client().Event() params := &consulapi.UserEvent{Name: "foo"} if _, _, err := event.Fire(params, nil); err != nil { t.Fatalf("err: %v", err) } }() - err := plan.Run(consulAddr) + err := plan.Run(a.HTTPAddr()) if err != nil { t.Fatalf("err: %v", err) } @@ -492,3 +488,15 @@ func TestEventWatch(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func mustParse(t *testing.T, q string) *watch.Plan { + var params map[string]interface{} + if err := json.Unmarshal([]byte(q), ¶ms); err != nil { + t.Fatal(err) + } + plan, err := watch.Parse(params) + if err != nil { + t.Fatalf("err: %v", err) + } + return plan +} From 375fbcb643d318cc0f978ac89753bcb41d736d72 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 12:48:05 +0200 Subject: [PATCH 02/15] watch: convert TestNodesWatch to use channels --- watch/funcs_test.go | 55 ++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 90a4a948cc..86786e781a 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -2,6 +2,8 @@ package watch_test import ( "encoding/json" + "errors" + "sync" "testing" "time" @@ -10,6 +12,8 @@ import ( "github.com/hashicorp/consul/watch" ) +var errBadContent = errors.New("bad content") + func TestKeyWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -227,49 +231,50 @@ func TestNodesWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"nodes"}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.([]*consulapi.Node) - if !ok || len(v) == 0 { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return // ignore } + v, ok := raw.([]*consulapi.Node) + if !ok || len(v) == 0 { + return // ignore + } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - time.Sleep(20 * time.Millisecond) - plan.Stop() - + defer wg.Done() catalog := a.Client().Catalog() + + time.Sleep(20 * time.Millisecond) reg := &consulapi.CatalogRegistration{ Node: "foobar", Address: "1.1.1.1", Datacenter: "dc1", } - catalog.Register(reg, nil) - time.Sleep(20 * time.Millisecond) - dereg := &consulapi.CatalogDeregistration{ - Node: "foobar", - Address: "1.1.1.1", - Datacenter: "dc1", + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) } - catalog.Deregister(dereg, nil) }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func TestServiceWatch(t *testing.T) { From ef4e8b58114c39fe3a0dd6b481c5045a56043f3a Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 14:23:57 +0200 Subject: [PATCH 03/15] watch: convert TestServiceWatch to use channels --- watch/funcs_test.go | 58 +++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 86786e781a..f883342bf2 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -281,49 +281,55 @@ func TestServiceWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.([]*consulapi.ServiceEntry) - if ok && len(v) == 0 { - return - } - if !ok || v[0].Service.ID != "foo" { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return // ignore } + v, ok := raw.([]*consulapi.ServiceEntry) + if !ok || len(v) == 0 { + return // ignore + } + if v[0].Service.ID != "foo" { + invoke <- errBadContent + return + } + invoke <- nil } - go func() { - time.Sleep(20 * time.Millisecond) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() agent := a.Client().Agent() + + time.Sleep(20 * time.Millisecond) reg := &consulapi.AgentServiceRegistration{ ID: "foo", Name: "foo", Tags: []string{"bar"}, } - agent.ServiceRegister(reg) - - time.Sleep(20 * time.Millisecond) - plan.Stop() - - agent.ServiceDeregister("foo") + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func TestChecksWatch_State(t *testing.T) { From eb2963345ed17224d535bb72dc51e3ef902bf998 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 14:30:58 +0200 Subject: [PATCH 04/15] watch: convert TestServicesWatch to use channels --- watch/funcs_test.go | 52 +++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index f883342bf2..d03c7cc3ed 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -188,43 +188,53 @@ func TestServicesWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"services"}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.(map[string][]string) - if !ok || v["consul"] == nil { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return // ignore } + v, ok := raw.(map[string][]string) + if !ok || len(v) == 0 { + return // ignore + } + if v["consul"] == nil { + invoke <- errBadContent + return + } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - time.Sleep(20 * time.Millisecond) - plan.Stop() - + defer wg.Done() agent := a.Client().Agent() + + time.Sleep(20 * time.Millisecond) reg := &consulapi.AgentServiceRegistration{ ID: "foo", Name: "foo", } - agent.ServiceRegister(reg) - time.Sleep(20 * time.Millisecond) - agent.ServiceDeregister("foo") + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func TestNodesWatch(t *testing.T) { From d57ef823f9892d9c9e821f21571f04be1ba4057a Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 14:39:26 +0200 Subject: [PATCH 05/15] watch: convert TestChecksWatch_State to use channels --- watch/funcs_test.go | 60 ++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index d03c7cc3ed..68ddae20a8 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -346,28 +346,30 @@ func TestChecksWatch_State(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"checks", "state":"warning"}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.([]*consulapi.HealthCheck) - if len(v) == 0 { - return - } - if !ok || v[0].CheckID != "foobar" { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return // ignore } + v, ok := raw.([]*consulapi.HealthCheck) + if !ok || len(v) == 0 { + return // ignore + } + if v[0].CheckID != "foobar" || v[0].Status != "warning" { + invoke <- errBadContent + return + } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - time.Sleep(20 * time.Millisecond) - + defer wg.Done() catalog := a.Client().Catalog() + + time.Sleep(20 * time.Millisecond) reg := &consulapi.CatalogRegistration{ Node: "foobar", Address: "1.1.1.1", @@ -379,27 +381,25 @@ func TestChecksWatch_State(t *testing.T) { Status: consulapi.HealthWarning, }, } - catalog.Register(reg, nil) - - time.Sleep(20 * time.Millisecond) - plan.Stop() - - dereg := &consulapi.CatalogDeregistration{ - Node: "foobar", - Address: "1.1.1.1", - Datacenter: "dc1", + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) } - catalog.Deregister(dereg, nil) }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func TestChecksWatch_Service(t *testing.T) { From 7a84f1c82d94ffa3a9d429f5b26d4925100ed896 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 14:55:04 +0200 Subject: [PATCH 06/15] watch: convert TestChecksWatch_Service to use channels --- watch/funcs_test.go | 61 +++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 68ddae20a8..2e9e992e98 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -406,28 +406,30 @@ func TestChecksWatch_Service(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"checks", "service":"foobar"}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.([]*consulapi.HealthCheck) - if len(v) == 0 { - return - } - if !ok || v[0].CheckID != "foobar" { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return // ignore } + v, ok := raw.([]*consulapi.HealthCheck) + if !ok || len(v) == 0 { + return // ignore + } + if v[0].CheckID != "foobar" { + invoke <- errBadContent + return + } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - time.Sleep(20 * time.Millisecond) - + defer wg.Done() catalog := a.Client().Catalog() + + time.Sleep(20 * time.Millisecond) reg := &consulapi.CatalogRegistration{ Node: "foobar", Address: "1.1.1.1", @@ -444,30 +446,25 @@ func TestChecksWatch_Service(t *testing.T) { ServiceID: "foobar", }, } - _, err := catalog.Register(reg, nil) - if err != nil { + if _, err := catalog.Register(reg, nil); err != nil { t.Fatalf("err: %v", err) } - - time.Sleep(20 * time.Millisecond) - plan.Stop() - - dereg := &consulapi.CatalogDeregistration{ - Node: "foobar", - Address: "1.1.1.1", - Datacenter: "dc1", - } - catalog.Deregister(dereg, nil) }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func TestEventWatch(t *testing.T) { From bad870dc6874d1b143c189ea2960630d8a24844a Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 15:02:25 +0200 Subject: [PATCH 07/15] watch: convert TestKeyWatch to use channels --- watch/funcs_test.go | 59 ++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 2e9e992e98..5e61ac3af4 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -18,54 +18,53 @@ func TestKeyWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.(*consulapi.KVPair) - if !ok || v == nil || string(v.Value) != "test" { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return // ignore } + v, ok := raw.(*consulapi.KVPair) + if !ok || v == nil { + return // ignore + } + if string(v.Value) != "test" { + invoke <- errBadContent + return + } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - defer plan.Stop() - time.Sleep(20 * time.Millisecond) - + defer wg.Done() kv := a.Client().KV() + + time.Sleep(20 * time.Millisecond) pair := &consulapi.KVPair{ Key: "foo/bar/baz", Value: []byte("test"), } - _, err := kv.Put(pair, nil) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Wait for the query to run - time.Sleep(20 * time.Millisecond) - plan.Stop() - - // Delete the key - _, err = kv.Delete("foo/bar/baz", nil) - if err != nil { + if _, err := kv.Put(pair, nil); err != nil { t.Fatalf("err: %v", err) } }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func TestKeyWatch_With_PrefixDelete(t *testing.T) { From 144b337aa5a3c1ba813b1850eb3412c0e16fa43f Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 15:07:44 +0200 Subject: [PATCH 08/15] watch: convert TestKeyWatch_With_PrefixDelete to use channels --- watch/funcs_test.go | 63 ++++++++++++++++++++------------------------- 1 file changed, 28 insertions(+), 35 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 5e61ac3af4..a24705d978 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -71,60 +71,53 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) - invoke := 0 - deletedKeyWatchInvoked := 0 plan.Handler = func(idx uint64, raw interface{}) { - if raw == nil && deletedKeyWatchInvoked == 0 { - deletedKeyWatchInvoked++ + if raw == nil { + return // ignore + } + v, ok := raw.(*consulapi.KVPair) + if !ok || v == nil { + return // ignore + } + if string(v.Value) != "test" { + invoke <- errBadContent return } - if invoke == 0 { - v, ok := raw.(*consulapi.KVPair) - if !ok || v == nil || string(v.Value) != "test" { - t.Fatalf("Bad: %#v", raw) - } - invoke++ - } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - defer plan.Stop() - time.Sleep(20 * time.Millisecond) - + defer wg.Done() kv := a.Client().KV() + + time.Sleep(20 * time.Millisecond) pair := &consulapi.KVPair{ Key: "foo/bar/baz", Value: []byte("test"), } - _, err := kv.Put(pair, nil) - if err != nil { + if _, err := kv.Put(pair, nil); err != nil { t.Fatalf("err: %v", err) } - - // Wait for the query to run - time.Sleep(20 * time.Millisecond) - - // Delete the key - _, err = kv.DeleteTree("foo/bar", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - - plan.Stop() }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke != 1 { - t.Fatalf("expected watch plan to be invoked once but got %v", invoke) - } - if deletedKeyWatchInvoked != 1 { - t.Fatalf("expected watch plan to be invoked once on delete but got %v", deletedKeyWatchInvoked) - } + plan.Stop() + wg.Wait() } func TestKeyPrefixWatch(t *testing.T) { From e9766c8bd4293768ad306b5893ec84b49dbde89f Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 15:12:08 +0200 Subject: [PATCH 09/15] watch: convert TestKeyPrefixWatch to use channels --- watch/funcs_test.go | 62 +++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index a24705d978..7c8ba8ed9b 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -124,56 +124,52 @@ func TestKeyPrefixWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.(consulapi.KVPairs) - if ok && v == nil { - return - } - if !ok || v == nil || string(v[0].Key) != "foo/bar" { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return // ignore } + v, ok := raw.(consulapi.KVPairs) + if !ok || len(v) == 0 { + return + } + if string(v[0].Key) != "foo/bar" { + invoke <- errBadContent + return + } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - defer plan.Stop() - time.Sleep(20 * time.Millisecond) - + defer wg.Done() kv := a.Client().KV() + + time.Sleep(20 * time.Millisecond) pair := &consulapi.KVPair{ Key: "foo/bar", } - _, err := kv.Put(pair, nil) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Wait for the query to run - time.Sleep(20 * time.Millisecond) - plan.Stop() - - // Delete the key - _, err = kv.Delete("foo/bar", nil) - if err != nil { + if _, err := kv.Put(pair, nil); err != nil { t.Fatalf("err: %v", err) } }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func TestServicesWatch(t *testing.T) { From a54b0994ed9228ca0ec3b8f1029476ddf38334c8 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 15:19:13 +0200 Subject: [PATCH 10/15] watch: convert TestEventWatch to use channels --- watch/funcs_test.go | 46 +++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 7c8ba8ed9b..b214707183 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -459,40 +459,50 @@ func TestEventWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() + invoke := make(chan error) plan := mustParse(t, `{"type":"event", "name": "foo"}`) - invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { - if invoke == 0 { - if raw == nil { - return - } - v, ok := raw.([]*consulapi.UserEvent) - if !ok || len(v) == 0 || string(v[len(v)-1].Name) != "foo" { - t.Fatalf("Bad: %#v", raw) - } - invoke++ + if raw == nil { + return } + v, ok := raw.([]*consulapi.UserEvent) + if !ok || len(v) == 0 { + return // ignore + } + if string(v[len(v)-1].Name) != "foo" { + invoke <- errBadContent + return + } + invoke <- nil } + var wg sync.WaitGroup + wg.Add(1) go func() { - defer plan.Stop() - time.Sleep(20 * time.Millisecond) - + defer wg.Done() event := a.Client().Event() + + time.Sleep(20 * time.Millisecond) params := &consulapi.UserEvent{Name: "foo"} if _, _, err := event.Fire(params, nil); err != nil { t.Fatalf("err: %v", err) } }() - err := plan.Run(a.HTTPAddr()) - if err != nil { + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(a.HTTPAddr()); err != nil { + t.Fatalf("err: %v", err) + } + }() + + if err := <-invoke; err != nil { t.Fatalf("err: %v", err) } - if invoke == 0 { - t.Fatalf("bad: %v", invoke) - } + plan.Stop() + wg.Wait() } func mustParse(t *testing.T, q string) *watch.Plan { From f55e234f2e6c69d3cc8fd69d8e3ee56423234185 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 15:24:57 +0200 Subject: [PATCH 11/15] watch: run tests in parallel --- watch/funcs_test.go | 9 +++++++++ watch/plan_test.go | 1 + watch/watch_test.go | 2 ++ 3 files changed, 12 insertions(+) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index b214707183..6ec781ee20 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -15,6 +15,7 @@ import ( var errBadContent = errors.New("bad content") func TestKeyWatch(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -68,6 +69,7 @@ func TestKeyWatch(t *testing.T) { } func TestKeyWatch_With_PrefixDelete(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -121,6 +123,7 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { } func TestKeyPrefixWatch(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -173,6 +176,7 @@ func TestKeyPrefixWatch(t *testing.T) { } func TestServicesWatch(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -226,6 +230,7 @@ func TestServicesWatch(t *testing.T) { } func TestNodesWatch(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -276,6 +281,7 @@ func TestNodesWatch(t *testing.T) { } func TestServiceWatch(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -331,6 +337,7 @@ func TestServiceWatch(t *testing.T) { } func TestChecksWatch_State(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -391,6 +398,7 @@ func TestChecksWatch_State(t *testing.T) { } func TestChecksWatch_Service(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -456,6 +464,7 @@ func TestChecksWatch_Service(t *testing.T) { } func TestEventWatch(t *testing.T) { + t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() diff --git a/watch/plan_test.go b/watch/plan_test.go index 4bb7d80389..16e4cfbc21 100644 --- a/watch/plan_test.go +++ b/watch/plan_test.go @@ -27,6 +27,7 @@ func mustParse(t *testing.T, q string) *Plan { } func TestRun_Stop(t *testing.T) { + t.Parallel() plan := mustParse(t, `{"type":"noop"}`) var expect uint64 = 1 diff --git a/watch/watch_test.go b/watch/watch_test.go index f4597b46f3..e3dc32ef1d 100644 --- a/watch/watch_test.go +++ b/watch/watch_test.go @@ -7,6 +7,7 @@ import ( ) func TestParseBasic(t *testing.T) { + t.Parallel() params := makeParams(t, `{"type":"key", "datacenter":"dc2", "token":"12345", "key":"foo"}`) p, err := Parse(params) if err != nil { @@ -24,6 +25,7 @@ func TestParseBasic(t *testing.T) { } func TestParse_exempt(t *testing.T) { + t.Parallel() params := makeParams(t, `{"type":"key", "key":"foo", "handler": "foobar"}`) p, err := ParseExempt(params, []string{"handler"}) if err != nil { From 570ddaae9809d493bc2afb3a11222591234c7907 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 16:28:20 +0200 Subject: [PATCH 12/15] watch: make sure invoke channels timeout evenutally --- watch/funcs_test.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 6ec781ee20..190ae24faa 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -13,13 +13,22 @@ import ( ) var errBadContent = errors.New("bad content") +var errTimeout = errors.New("timeout") + +var timeout = 5 * time.Second + +func makeInvokeCh() chan error { + ch := make(chan error) + time.AfterFunc(timeout, func() { ch <- errTimeout }) + return ch +} func TestKeyWatch(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -73,7 +82,7 @@ func TestKeyWatch_With_PrefixDelete(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -127,7 +136,7 @@ func TestKeyPrefixWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -180,7 +189,7 @@ func TestServicesWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"services"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -234,7 +243,7 @@ func TestNodesWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"nodes"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -285,7 +294,7 @@ func TestServiceWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -341,7 +350,7 @@ func TestChecksWatch_State(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"checks", "state":"warning"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -402,7 +411,7 @@ func TestChecksWatch_Service(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"checks", "service":"foobar"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -468,7 +477,7 @@ func TestEventWatch(t *testing.T) { a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() - invoke := make(chan error) + invoke := makeInvokeCh() plan := mustParse(t, `{"type":"event", "name": "foo"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { From 96a584dee27c0285623d4e17ddaf3c0274dc1675 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 17:12:27 +0200 Subject: [PATCH 13/15] agent: fix TestRetryJoinWanFail --- command/agent/agent_test.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 8ba836f26a..94e15f5eb1 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -156,22 +156,17 @@ func TestRetryJoinFail(t *testing.T) { func TestRetryJoinWanFail(t *testing.T) { t.Parallel() - t.Skip("fs: skipping tests that use cmd.Run until signal handling is fixed") - cfg := agent.TestConfig() tmpDir := testutil.TempDir(t, "consul") defer os.RemoveAll(tmpDir) - shutdownCh := make(chan struct{}) - defer close(shutdownCh) - ui := cli.NewMockUi() - cmd := New(ui, "", "", "", "", shutdownCh) + cmd := New(ui, "", "", "", "", nil) args := []string{ "-server", - "-bind", cfg.BindAddr.String(), + "-bind", "127.0.0.1", "-data-dir", tmpDir, - "-retry-join-wan", cfg.SerfBindAddrWAN.String(), + "-retry-join-wan", "127.0.0.1:99", "-retry-max-wan", "1", "-retry-interval-wan", "10ms", } From 7d3467176b377411eaf5a8ccbcba937f9923e53d Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 17:13:37 +0200 Subject: [PATCH 14/15] agent: fix TestRetryJoinFail --- command/agent/agent_test.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 94e15f5eb1..9e0a3bddce 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -130,21 +130,16 @@ func TestRetryJoin(t *testing.T) { func TestRetryJoinFail(t *testing.T) { t.Parallel() - t.Skip("fs: skipping tests that use cmd.Run until signal handling is fixed") - cfg := agent.TestConfig() tmpDir := testutil.TempDir(t, "consul") defer os.RemoveAll(tmpDir) - shutdownCh := make(chan struct{}) - defer close(shutdownCh) - ui := cli.NewMockUi() - cmd := New(ui, "", "", "", "", shutdownCh) + cmd := New(ui, "", "", "", "", nil) args := []string{ - "-bind", cfg.BindAddr.String(), + "-bind", "127.0.0.1", "-data-dir", tmpDir, - "-retry-join", cfg.SerfBindAddrLAN.String(), + "-retry-join", "127.0.0.1:99", "-retry-max", "1", "-retry-interval", "10ms", } From 987247516709dfc352bbcd249bbfc432354232a0 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Tue, 24 Oct 2017 17:31:02 +0200 Subject: [PATCH 15/15] agent: fix TestRetryJoin --- command/agent/agent_test.go | 62 +++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 9e0a3bddce..f9d64dcaa2 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -2,11 +2,11 @@ package agent import ( "fmt" - "log" "os" "os/exec" "path/filepath" "strings" + "sync" "testing" "github.com/hashicorp/consul/agent" @@ -81,43 +81,42 @@ func TestConfigFail(t *testing.T) { func TestRetryJoin(t *testing.T) { t.Parallel() - t.Skip("fs: skipping tests that use cmd.Run until signal handling is fixed") a := agent.NewTestAgent(t.Name(), "") defer a.Shutdown() - cfg2 := agent.TestConfig() - tmpDir := testutil.TempDir(t, "consul") - defer os.RemoveAll(tmpDir) - - doneCh := make(chan struct{}) shutdownCh := make(chan struct{}) - defer func() { - close(shutdownCh) - <-doneCh - }() - - ui := cli.NewMockUi() - cmd := New(ui, "", version.Version, "", "", shutdownCh) - - args := []string{ - "-server", - "-bind", a.Config.BindAddr.String(), - "-data-dir", tmpDir, - "-node", fmt.Sprintf(`"%s"`, cfg2.NodeName), - "-advertise", a.Config.BindAddr.String(), - "-retry-join", a.Config.SerfBindAddrLAN.String(), - "-retry-interval", "1s", - "-retry-join-wan", a.Config.SerfBindAddrWAN.String(), - "-retry-interval-wan", "1s", - } - + var wg sync.WaitGroup + wg.Add(1) go func() { - if code := cmd.Run(args); code != 0 { - log.Printf("bad: %d", code) + defer wg.Done() + + tmpDir := testutil.TempDir(t, "consul") + defer os.RemoveAll(tmpDir) + + args := []string{ + "-server", + "-bind", a.Config.BindAddr.String(), + "-data-dir", tmpDir, + "-node", "Node 11111111-1111-1111-1111-111111111111", + "-node-id", "11111111-1111-1111-1111-111111111111", + "-advertise", a.Config.BindAddr.String(), + "-retry-join", a.Config.SerfBindAddrLAN.String(), + "-retry-interval", "1s", + "-retry-join-wan", a.Config.SerfBindAddrWAN.String(), + "-retry-interval-wan", "1s", + } + + ui := cli.NewMockUi() + cmd := New(ui, "", version.Version, "", "", shutdownCh) + // closing shutdownCh triggers a SIGINT which triggers shutdown without leave + // which will return 1 + if code := cmd.Run(args); code != 1 { + t.Log(ui.ErrorWriter.String()) + t.Fatalf("bad: %d", code) } - close(doneCh) }() + retry.Run(t, func(r *retry.R) { if got, want := len(a.LANMembers()), 2; got != want { r.Fatalf("got %d LAN members want %d", got, want) @@ -126,6 +125,9 @@ func TestRetryJoin(t *testing.T) { r.Fatalf("got %d WAN members want %d", got, want) } }) + + close(shutdownCh) + wg.Wait() } func TestRetryJoinFail(t *testing.T) {