diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index 5706ac068e..fa64456b01 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -292,70 +292,74 @@ func TestCatalogNodes_Blocking(t *testing.T) { t.Parallel() a := NewTestAgent(t, t.Name(), "") defer a.Shutdown() - testrpc.WaitForTestAgent(t, a.RPC, "dc1") + testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WaitForAntiEntropySync()) - // Register node + // Run the query args := &structs.DCSpecificRequest{ Datacenter: "dc1", } - var out structs.IndexedNodes if err := a.RPC("Catalog.ListNodes", *args, &out); err != nil { t.Fatalf("err: %v", err) } - // t.Fatal must be called from the main go routine - // of the test. Because of this we cannot call - // t.Fatal from within the go routines and use - // an error channel instead. - errch := make(chan error, 2) + // Async cause a change + waitIndex := out.Index + start := time.Now() go func() { - testrpc.WaitForTestAgent(t, a.RPC, "dc1") - start := time.Now() - - // register a service after the blocking call - // in order to unblock it. - time.AfterFunc(100*time.Millisecond, func() { - args := &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - } - var out struct{} - errch <- a.RPC("Catalog.Register", args, &out) - }) - - // now block - req, _ := http.NewRequest("GET", fmt.Sprintf("/v1/catalog/nodes?wait=3s&index=%d", out.Index+1), nil) - resp := httptest.NewRecorder() - obj, err := a.srv.CatalogNodes(resp, req) - if err != nil { - errch <- err + time.Sleep(100 * time.Millisecond) + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", } - - // Should block for a while - if d := time.Since(start); d < 50*time.Millisecond { - errch <- fmt.Errorf("too fast: %v", d) + var out struct{} + if err := a.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) } - - if idx := getIndex(t, resp); idx <= out.Index { - errch <- fmt.Errorf("bad: %v", idx) - } - - nodes := obj.(structs.Nodes) - if len(nodes) != 2 { - errch <- fmt.Errorf("bad: %v", obj) - } - errch <- nil }() - // wait for both go routines to return - if err := <-errch; err != nil { - t.Fatal(err) + const waitDuration = 3 * time.Second + + // Re-run the query, if errantly woken up with no change, resume blocking. + var elapsed time.Duration +RUN_BLOCKING_QUERY: + req, err := http.NewRequest("GET", fmt.Sprintf("/v1/catalog/nodes?wait=%s&index=%d", + waitDuration.String(), + waitIndex), nil) + if err != nil { + t.Fatalf("err: %v", err) } - if err := <-errch; err != nil { - t.Fatal(err) + resp := httptest.NewRecorder() + obj, err := a.srv.CatalogNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) } + + elapsed = time.Since(start) + + idx := getIndex(t, resp) + if idx < waitIndex { + t.Fatalf("bad: %v", idx) + } else if idx == waitIndex { + if elapsed > waitDuration { + // This should prevent the loop from running longer than the + // waitDuration + t.Fatalf("too slow: %v", elapsed) + } + goto RUN_BLOCKING_QUERY + } + + // Should block at least 100ms before getting the changed results + if elapsed < 100*time.Millisecond { + t.Fatalf("too fast: %v", elapsed) + } + + nodes := obj.(structs.Nodes) + if len(nodes) != 2 { + t.Fatalf("bad: %v", obj) + } + } func TestCatalogNodes_DistanceSort(t *testing.T) {