From 9566df524e98f15e41a927c7d385b67f4966fea8 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Thu, 26 Sep 2019 10:42:17 -0500 Subject: [PATCH] agent: cache notifications work after error if the underlying RPC returns index=1 (#6547) Fixes #6521 Ensure that initial failures to fetch an agent cache entry using the notify API where the underlying RPC returns a synthetic index of 1 correctly recovers when those RPCs resume working. The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the index for the next query from 0 to 1 for all queries, when it should have not done so for queries that errored. Also fixed some things that made debugging difficult: - config entry read/list endpoints send back QueryMeta headers - xds event loops don't swallow the cache notification errors --- agent/agent_test.go | 98 +++++++++++++++++++++++++++++++++++++++ agent/cache/watch.go | 2 +- agent/cache/watch_test.go | 7 +-- agent/config_endpoint.go | 2 + agent/proxycfg/state.go | 36 ++++++++------ 5 files changed, 127 insertions(+), 18 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 5ebc4c3041..13916fb4ee 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -20,6 +20,8 @@ import ( "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/connect" @@ -4011,3 +4013,99 @@ func TestAgent_RerouteNewHTTPChecks(t *testing.T) { } }) } + +func TestAgentCache_serviceInConfigFile_initialFetchErrors_Issue6521(t *testing.T) { + t.Parallel() + + // Ensure that initial failures to fetch the discovery chain via the agent + // cache using the notify API for a service with no config entries + // correctly recovers when those RPCs resume working. The key here is that + // the lack of config entries guarantees that the RPC will come back with a + // synthetic index of 1. + // + // The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the + // index for the next query from 0 to 1 for all queries, when it should + // have not done so for queries that errored. + + a1 := NewTestAgent(t, t.Name()+"-a1", "") + defer a1.Shutdown() + testrpc.WaitForLeader(t, a1.RPC, "dc1") + + a2 := NewTestAgent(t, t.Name()+"-a2", ` + server = false + bootstrap = false +services { + name = "echo-client" + port = 8080 + connect { + sidecar_service { + proxy { + upstreams { + destination_name = "echo" + local_bind_port = 9191 + } + } + } + } +} + +services { + name = "echo" + port = 9090 + connect { + sidecar_service {} + } +} + `) + defer a2.Shutdown() + + // Starting a client agent disconnected from a server with services. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan cache.UpdateEvent, 1) + require.NoError(t, a2.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ + Datacenter: "dc1", + Name: "echo", + EvaluateInDatacenter: "dc1", + EvaluateInNamespace: "default", + }, "foo", ch)) + + { // The first event is an error because we are not joined yet. + evt := <-ch + require.Equal(t, "foo", evt.CorrelationID) + require.Nil(t, evt.Result) + require.Error(t, evt.Err) + require.Equal(t, evt.Err, structs.ErrNoServers) + } + + t.Logf("joining client to server") + + // Now connect to server + _, err := a1.JoinLAN([]string{ + fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN), + }) + require.NoError(t, err) + + t.Logf("joined client to server") + + deadlineCh := time.After(10 * time.Second) + start := time.Now() +LOOP: + for { + select { + case evt := <-ch: + // We may receive several notifications of an error until we get the + // first successful reply. + require.Equal(t, "foo", evt.CorrelationID) + if evt.Err != nil { + break LOOP + } + require.NoError(t, evt.Err) + require.NotNil(t, evt.Result) + t.Logf("took %s to get first success", time.Since(start)) + case <-deadlineCh: + t.Fatal("did not get notified successfully") + } + } +} diff --git a/agent/cache/watch.go b/agent/cache/watch.go index 47476c37b8..fca176fe08 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -126,7 +126,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co } } // Sanity check we always request blocking on second pass - if index < 1 { + if err == nil && index < 1 { index = 1 } } diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go index 38661374ec..cce8d35590 100644 --- a/agent/cache/watch_test.go +++ b/agent/cache/watch_test.go @@ -33,8 +33,9 @@ func TestCacheNotify(t *testing.T) { // initially. typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once() - // Configure the type - typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) { + // Configure the type. The first time we use the fake index of "1" to verify we + // don't regress on https://github.com/hashicorp/consul/issues/6521 . + typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) { // Assert the right request type - all real Fetch implementations do this so // it keeps us honest that Watch doesn't require type mangling which will // break in real life (hint: it did on the first attempt) @@ -79,7 +80,7 @@ func TestCacheNotify(t *testing.T) { TestCacheNotifyChResult(t, ch, UpdateEvent{ CorrelationID: "test", Result: 1, - Meta: ResultMeta{Hit: false, Index: 4}, + Meta: ResultMeta{Hit: false, Index: 1}, Err: nil, }) diff --git a/agent/config_endpoint.go b/agent/config_endpoint.go index 2f290e66ef..283f9f45e2 100644 --- a/agent/config_endpoint.go +++ b/agent/config_endpoint.go @@ -42,6 +42,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int if err := s.agent.RPC("ConfigEntry.Get", &args, &reply); err != nil { return nil, err } + setMeta(resp, &reply.QueryMeta) if reply.Entry == nil { return nil, NotFoundError{Reason: fmt.Sprintf("Config entry not found for %q / %q", pathArgs[0], pathArgs[1])} @@ -56,6 +57,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int if err := s.agent.RPC("ConfigEntry.List", &args, &reply); err != nil { return nil, err } + setMeta(resp, &reply.QueryMeta) return reply.Entries, nil default: diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 05bde60d3e..a83677f94c 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -470,18 +470,22 @@ func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error { } func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error { + if u.Err != nil { + return fmt.Errorf("error filling agent cache: %v", u.Err) + } + switch { case u.CorrelationID == rootsWatchID: roots, ok := u.Result.(*structs.IndexedCARoots) if !ok { - return fmt.Errorf("invalid type for roots response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } snap.Roots = roots case u.CorrelationID == leafWatchID: leaf, ok := u.Result.(*structs.IssuedCert) if !ok { - return fmt.Errorf("invalid type for leaf response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } snap.ConnectProxy.Leaf = leaf @@ -491,7 +495,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh case strings.HasPrefix(u.CorrelationID, "discovery-chain:"): resp, ok := u.Result.(*structs.DiscoveryChainResponse) if !ok { - return fmt.Errorf("invalid type for service response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:") snap.ConnectProxy.DiscoveryChain[svc] = resp.Chain @@ -503,7 +507,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh case strings.HasPrefix(u.CorrelationID, "upstream-target:"): resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) if !ok { - return fmt.Errorf("invalid type for service response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:") targetID, svc, ok := removeColonPrefix(correlationID) @@ -521,7 +525,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"): resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) if !ok { - return fmt.Errorf("invalid type for service response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:") dc, svc, ok := removeColonPrefix(correlationID) @@ -538,7 +542,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh case strings.HasPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix): resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) if !ok { - return fmt.Errorf("invalid type for service response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } svc := strings.TrimPrefix(u.CorrelationID, "upstream:"+serviceIDPrefix) snap.ConnectProxy.UpstreamEndpoints[svc] = resp.Nodes @@ -546,7 +550,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix): resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse) if !ok { - return fmt.Errorf("invalid type for prepared query response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } pq := strings.TrimPrefix(u.CorrelationID, "upstream:") snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes @@ -560,7 +564,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh snap.ConnectProxy.WatchedServiceChecks[svcID] = resp default: - return errors.New("unknown correlation ID") + return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID) } return nil } @@ -668,17 +672,21 @@ func (s *state) resetWatchesFromChain( } func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error { + if u.Err != nil { + return fmt.Errorf("error filling agent cache: %v", u.Err) + } + switch u.CorrelationID { case rootsWatchID: roots, ok := u.Result.(*structs.IndexedCARoots) if !ok { - return fmt.Errorf("invalid type for roots response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } snap.Roots = roots case serviceListWatchID: services, ok := u.Result.(*structs.IndexedServices) if !ok { - return fmt.Errorf("invalid type for services response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } for svcName := range services.Services { @@ -721,7 +729,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho case datacentersWatchID: datacentersRaw, ok := u.Result.(*[]string) if !ok { - return fmt.Errorf("invalid type for datacenters response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } if datacentersRaw == nil { return fmt.Errorf("invalid response with a nil datacenter list") @@ -771,7 +779,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho case serviceResolversWatchID: configEntries, ok := u.Result.(*structs.IndexedConfigEntries) if !ok { - return fmt.Errorf("invalid type for services response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } resolvers := make(map[string]*structs.ServiceResolverConfigEntry) @@ -786,7 +794,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho case strings.HasPrefix(u.CorrelationID, "connect-service:"): resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) if !ok { - return fmt.Errorf("invalid type for service response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } svc := strings.TrimPrefix(u.CorrelationID, "connect-service:") @@ -799,7 +807,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"): resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) if !ok { - return fmt.Errorf("invalid type for service response: %T", u.Result) + return fmt.Errorf("invalid type for response: %T", u.Result) } dc := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")