diff --git a/agent/agent.go b/agent/agent.go index da059801e6..71a1fb8a4b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2092,8 +2092,10 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error { return fmt.Errorf("ServiceID missing") } - // Shut down the config watch in the service manager. - a.serviceManager.RemoveService(serviceID) + // Shut down the config watch in the service manager if enabled. + if a.config.EnableCentralServiceConfig { + a.serviceManager.RemoveService(serviceID) + } checks := a.State.Checks() var checkIDs []types.CheckID diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go index 54b542fe6f..1995987ce3 100644 --- a/agent/cache/watch_test.go +++ b/agent/cache/watch_test.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "fmt" "sync/atomic" "testing" @@ -23,11 +24,15 @@ func TestCacheNotify(t *testing.T) { }) // Setup triggers to control when "updates" should be delivered - trigger := make([]chan time.Time, 4) + trigger := make([]chan time.Time, 5) for i := range trigger { trigger[i] = make(chan time.Time) } + // Send an error to fake a situation where the servers aren't reachable + // initially. + typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once() + // Configure the type typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) { // Assert the right request type - all real Fetch implementations do this so @@ -35,16 +40,16 @@ func TestCacheNotify(t *testing.T) { // break in real life (hint: it did on the first attempt) _, ok := args.Get(1).(*MockRequest) require.True(t, ok) - }) - typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[0]) + }).WaitUntil(trigger[0]) typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1]) - typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[2]) + typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[2]) + typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[3]) // It's timing dependent whether the blocking loop manages to make another // call before we cancel so don't require it. We need to have a higher index // here because if the index is the same then the cache Get will not return // until the full 10 min timeout expires. This causes the last fetch to return // after cancellation as if it had timed out. - typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[3]) + typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[4]) require := require.New(t) @@ -56,12 +61,12 @@ func TestCacheNotify(t *testing.T) { err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch) require.NoError(err) - // Should receive the first result pretty soon + // Should receive the error with index == 0 first. TestCacheNotifyChResult(t, ch, UpdateEvent{ CorrelationID: "test", - Result: 1, - Meta: ResultMeta{Hit: false, Index: 4}, - Err: nil, + Result: nil, + Meta: ResultMeta{Hit: false, Index: 0}, + Err: errors.New("no servers available"), }) // There should be no more updates delivered yet @@ -70,6 +75,17 @@ func TestCacheNotify(t *testing.T) { // Trigger blocking query to return a "change" close(trigger[0]) + // Should receive the first real update next. + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 1, + Meta: ResultMeta{Hit: false, Index: 4}, + Err: nil, + }) + + // Trigger blocking query to return a "change" + close(trigger[1]) + // Should receive the next result pretty soon TestCacheNotifyChResult(t, ch, UpdateEvent{ CorrelationID: "test", @@ -99,7 +115,7 @@ func TestCacheNotify(t *testing.T) { // We could wait for a full timeout but we can't directly observe it so // simulate the behavior by triggering a response with the same value and // index as the last one. - close(trigger[1]) + close(trigger[2]) // We should NOT be notified about that. Note this is timing dependent but // it's only a sanity check, if we somehow _do_ get the change delivered later @@ -108,7 +124,7 @@ func TestCacheNotify(t *testing.T) { require.Len(ch, 0) // Trigger final update - close(trigger[2]) + close(trigger[3]) TestCacheNotifyChResult(t, ch, UpdateEvent{ CorrelationID: "test", @@ -134,7 +150,7 @@ func TestCacheNotify(t *testing.T) { // have no way to interrupt a blocking query. In practice it's fine to know // that after 10 mins max the blocking query will return and the resources // will be cleaned. - close(trigger[3]) + close(trigger[4]) // I want to test that canceling the context cleans up goroutines (which it // does from manual verification with debugger etc). I had a check based on a diff --git a/agent/config/runtime.go b/agent/config/runtime.go index dc93b614d0..ba3689a2cd 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -672,7 +672,7 @@ type RuntimeConfig struct { // EnableCentralServiceConfig controls whether the agent should incorporate // centralized config such as service-defaults into local service registrations. // - // hcl: (enable) + // hcl: enable_central_service_config = (true|false) EnableCentralServiceConfig bool // EnableDebug is used to enable various debugging features.