mirror of https://github.com/status-im/consul.git
Test an index=0 value in cache.Notify
This commit is contained in:
parent
cba47aa0ca
commit
43bfc20dc8
|
@ -2092,8 +2092,10 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error {
|
||||||
return fmt.Errorf("ServiceID missing")
|
return fmt.Errorf("ServiceID missing")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shut down the config watch in the service manager.
|
// Shut down the config watch in the service manager if enabled.
|
||||||
|
if a.config.EnableCentralServiceConfig {
|
||||||
a.serviceManager.RemoveService(serviceID)
|
a.serviceManager.RemoveService(serviceID)
|
||||||
|
}
|
||||||
|
|
||||||
checks := a.State.Checks()
|
checks := a.State.Checks()
|
||||||
var checkIDs []types.CheckID
|
var checkIDs []types.CheckID
|
||||||
|
|
|
@ -2,6 +2,7 @@ package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -23,11 +24,15 @@ func TestCacheNotify(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Setup triggers to control when "updates" should be delivered
|
// Setup triggers to control when "updates" should be delivered
|
||||||
trigger := make([]chan time.Time, 4)
|
trigger := make([]chan time.Time, 5)
|
||||||
for i := range trigger {
|
for i := range trigger {
|
||||||
trigger[i] = make(chan time.Time)
|
trigger[i] = make(chan time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send an error to fake a situation where the servers aren't reachable
|
||||||
|
// initially.
|
||||||
|
typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once()
|
||||||
|
|
||||||
// Configure the type
|
// Configure the type
|
||||||
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
|
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
|
||||||
// Assert the right request type - all real Fetch implementations do this so
|
// Assert the right request type - all real Fetch implementations do this so
|
||||||
|
@ -35,16 +40,16 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// break in real life (hint: it did on the first attempt)
|
// break in real life (hint: it did on the first attempt)
|
||||||
_, ok := args.Get(1).(*MockRequest)
|
_, ok := args.Get(1).(*MockRequest)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
})
|
}).WaitUntil(trigger[0])
|
||||||
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[0])
|
|
||||||
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1])
|
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1])
|
||||||
typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[2])
|
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[2])
|
||||||
|
typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[3])
|
||||||
// It's timing dependent whether the blocking loop manages to make another
|
// It's timing dependent whether the blocking loop manages to make another
|
||||||
// call before we cancel so don't require it. We need to have a higher index
|
// call before we cancel so don't require it. We need to have a higher index
|
||||||
// here because if the index is the same then the cache Get will not return
|
// here because if the index is the same then the cache Get will not return
|
||||||
// until the full 10 min timeout expires. This causes the last fetch to return
|
// until the full 10 min timeout expires. This causes the last fetch to return
|
||||||
// after cancellation as if it had timed out.
|
// after cancellation as if it had timed out.
|
||||||
typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[3])
|
typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[4])
|
||||||
|
|
||||||
require := require.New(t)
|
require := require.New(t)
|
||||||
|
|
||||||
|
@ -56,12 +61,12 @@ func TestCacheNotify(t *testing.T) {
|
||||||
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
|
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
|
||||||
// Should receive the first result pretty soon
|
// Should receive the error with index == 0 first.
|
||||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
CorrelationID: "test",
|
CorrelationID: "test",
|
||||||
Result: 1,
|
Result: nil,
|
||||||
Meta: ResultMeta{Hit: false, Index: 4},
|
Meta: ResultMeta{Hit: false, Index: 0},
|
||||||
Err: nil,
|
Err: errors.New("no servers available"),
|
||||||
})
|
})
|
||||||
|
|
||||||
// There should be no more updates delivered yet
|
// There should be no more updates delivered yet
|
||||||
|
@ -70,6 +75,17 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// Trigger blocking query to return a "change"
|
// Trigger blocking query to return a "change"
|
||||||
close(trigger[0])
|
close(trigger[0])
|
||||||
|
|
||||||
|
// Should receive the first real update next.
|
||||||
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
|
CorrelationID: "test",
|
||||||
|
Result: 1,
|
||||||
|
Meta: ResultMeta{Hit: false, Index: 4},
|
||||||
|
Err: nil,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Trigger blocking query to return a "change"
|
||||||
|
close(trigger[1])
|
||||||
|
|
||||||
// Should receive the next result pretty soon
|
// Should receive the next result pretty soon
|
||||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
CorrelationID: "test",
|
CorrelationID: "test",
|
||||||
|
@ -99,7 +115,7 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// We could wait for a full timeout but we can't directly observe it so
|
// We could wait for a full timeout but we can't directly observe it so
|
||||||
// simulate the behavior by triggering a response with the same value and
|
// simulate the behavior by triggering a response with the same value and
|
||||||
// index as the last one.
|
// index as the last one.
|
||||||
close(trigger[1])
|
close(trigger[2])
|
||||||
|
|
||||||
// We should NOT be notified about that. Note this is timing dependent but
|
// We should NOT be notified about that. Note this is timing dependent but
|
||||||
// it's only a sanity check, if we somehow _do_ get the change delivered later
|
// it's only a sanity check, if we somehow _do_ get the change delivered later
|
||||||
|
@ -108,7 +124,7 @@ func TestCacheNotify(t *testing.T) {
|
||||||
require.Len(ch, 0)
|
require.Len(ch, 0)
|
||||||
|
|
||||||
// Trigger final update
|
// Trigger final update
|
||||||
close(trigger[2])
|
close(trigger[3])
|
||||||
|
|
||||||
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
TestCacheNotifyChResult(t, ch, UpdateEvent{
|
||||||
CorrelationID: "test",
|
CorrelationID: "test",
|
||||||
|
@ -134,7 +150,7 @@ func TestCacheNotify(t *testing.T) {
|
||||||
// have no way to interrupt a blocking query. In practice it's fine to know
|
// have no way to interrupt a blocking query. In practice it's fine to know
|
||||||
// that after 10 mins max the blocking query will return and the resources
|
// that after 10 mins max the blocking query will return and the resources
|
||||||
// will be cleaned.
|
// will be cleaned.
|
||||||
close(trigger[3])
|
close(trigger[4])
|
||||||
|
|
||||||
// I want to test that canceling the context cleans up goroutines (which it
|
// I want to test that canceling the context cleans up goroutines (which it
|
||||||
// does from manual verification with debugger etc). I had a check based on a
|
// does from manual verification with debugger etc). I had a check based on a
|
||||||
|
|
|
@ -672,7 +672,7 @@ type RuntimeConfig struct {
|
||||||
// EnableCentralServiceConfig controls whether the agent should incorporate
|
// EnableCentralServiceConfig controls whether the agent should incorporate
|
||||||
// centralized config such as service-defaults into local service registrations.
|
// centralized config such as service-defaults into local service registrations.
|
||||||
//
|
//
|
||||||
// hcl: (enable)
|
// hcl: enable_central_service_config = (true|false)
|
||||||
EnableCentralServiceConfig bool
|
EnableCentralServiceConfig bool
|
||||||
|
|
||||||
// EnableDebug is used to enable various debugging features.
|
// EnableDebug is used to enable various debugging features.
|
||||||
|
|
Loading…
Reference in New Issue