mirror of https://github.com/status-im/consul.git
Connect: Make Connect health queries unblock correctly (#5508)
* Make Connect health queryies unblock correctly in all cases and use optimal number of watch chans. Fixes #5506. * Node check test cases and clearer bug test doc * Comment update
This commit is contained in:
parent
77bd53269d
commit
d2e68a900a
|
@ -1896,33 +1896,89 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
|||
|
||||
// Return the results.
|
||||
var results structs.ServiceNodes
|
||||
|
||||
// For connect queries we need a list of any proxy service names in the result
|
||||
// set. Rather than have different code path for connect and non-connect, we
|
||||
// use the same one in both cases. For non-empty non-connect results,
|
||||
// serviceNames will always have exactly one element which is the same as
|
||||
// serviceName. For Connect there might be multiple different service names -
|
||||
// one for each service name a proxy is registered under, and the target
|
||||
// service name IFF there is at least one Connect-native instance of that
|
||||
// service. Either way there is usually only one distinct name if proxies are
|
||||
// named consistently but could be multiple.
|
||||
serviceNames := make(map[string]struct{}, 2)
|
||||
for service := iter.Next(); service != nil; service = iter.Next() {
|
||||
results = append(results, service.(*structs.ServiceNode))
|
||||
sn := service.(*structs.ServiceNode)
|
||||
results = append(results, sn)
|
||||
serviceNames[sn.ServiceName] = struct{}{}
|
||||
}
|
||||
|
||||
// Get the table index.
|
||||
idx, ch := maxIndexAndWatchChForService(tx, serviceName, len(results) > 0, true)
|
||||
// watchOptimized tracks if we meet the necessary condition to optimize
|
||||
// WatchSet size. That is that every service name represented in the result
|
||||
// set must have a service-specific index we can watch instead of many radix
|
||||
// nodes for all the actual nodes touched. This saves us watching potentially
|
||||
// thousands of watch chans for large services which may need many goroutines.
|
||||
// It also avoids the performance cliff that is hit when watchLimit is hit
|
||||
// (~682 service instances). See
|
||||
// https://github.com/hashicorp/consul/issues/4984
|
||||
watchOptimized := false
|
||||
idx := uint64(0)
|
||||
if len(serviceNames) > 0 {
|
||||
// Assume optimization will work since it really should at this point. For
|
||||
// safety we'll sanity check this below for each service name.
|
||||
watchOptimized = true
|
||||
|
||||
// Fetch indexes for all names services in result set.
|
||||
for svcName := range serviceNames {
|
||||
// We know service values should exist since the serviceNames map is only
|
||||
// populated if there is at least one result above. so serviceExists arg
|
||||
// below is always true.
|
||||
svcIdx, svcCh := maxIndexAndWatchChForService(tx, svcName, true, true)
|
||||
// Take the max index represented
|
||||
if idx < svcIdx {
|
||||
idx = svcIdx
|
||||
}
|
||||
if svcCh != nil {
|
||||
// Watch the service-specific index for changes in liu of all iradix nodes
|
||||
// for checks etc.
|
||||
ws.Add(svcCh)
|
||||
} else {
|
||||
// Nil svcCh shouldn't really happen since all existent services should
|
||||
// have a service-specific index but just in case it does due to a bug,
|
||||
// fall back to the more expensive old way of watching every radix node
|
||||
// we touch.
|
||||
watchOptimized = false
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// If we have no results, we should use the index of the last service
|
||||
// extinction event so we don't go backwards when services de-register. We
|
||||
// use target serviceName here but it actually doesn't matter. No chan will
|
||||
// be returned as we can't use the optimization in this case (and don't need
|
||||
// to as there is only one chan to watch anyway).
|
||||
idx, _ = maxIndexAndWatchChForService(tx, serviceName, false, true)
|
||||
}
|
||||
|
||||
// Create a nil watchset to pass below, we'll only pass the real one if we
|
||||
// need to. Nil watchers are safe/allowed and saves some allocation too.
|
||||
var fallbackWS memdb.WatchSet
|
||||
if ch == nil {
|
||||
// There was no explicit channel returned that corresponds to the service
|
||||
// index. That means we need to fallback to watching everything we touch in
|
||||
// the DB as normal. We plumb the caller's watchset through (note it's a map
|
||||
// so this is a by-reference assignment.)
|
||||
if !watchOptimized {
|
||||
// We weren't able to use the optimization of watching only service indexes
|
||||
// for some reason. That means we need to fallback to watching everything we
|
||||
// touch in the DB as normal. We plumb the caller's watchset through (note
|
||||
// it's a map so this is a by-reference assignment.)
|
||||
fallbackWS = ws
|
||||
// We also need to watch the iterator from earlier too.
|
||||
fallbackWS.Add(iter.WatchCh())
|
||||
} else {
|
||||
// There was a valid service index, and non-empty result. In this case it is
|
||||
// sufficient just to watch the service index's chan since that _must_ be
|
||||
// written to if the result of this method is going to change. This saves us
|
||||
// watching potentially thousands of watch chans for large services which
|
||||
// may need many goroutines. It also avoid the performance cliff that is hit
|
||||
// when watchLimit is hit (~682 service instances). See
|
||||
// https://github.com/hashicorp/consul/issues/4984
|
||||
ws.Add(ch)
|
||||
} else if connect {
|
||||
// If this is a connect query then there is a subtlety to watch out for.
|
||||
// In addition to watching the proxy service indexes for changes above, we
|
||||
// need to still keep an eye on the connect service index in case a new
|
||||
// proxy with a new name registers - we are only watching proxy service
|
||||
// names we know about above so we'd miss that otherwise. Thankfully this
|
||||
// is only ever one extra chan to watch and will catch any changes to
|
||||
// proxy registrations for this target service.
|
||||
ws.Add(iter.WatchCh())
|
||||
}
|
||||
|
||||
return s.parseCheckServiceNodes(tx, fallbackWS, idx, serviceName, results, err)
|
||||
|
|
|
@ -2773,10 +2773,10 @@ func ensureIndexForService(t *testing.T, s *Store, ws memdb.WatchSet, serviceNam
|
|||
}
|
||||
}
|
||||
|
||||
// TestIndexIndependence test that changes on a given service does not impact the
|
||||
// TestStateStore_IndexIndependence test that changes on a given service does not impact the
|
||||
// index of other services. It allows to have huge benefits for watches since
|
||||
// watchers are notified ONLY when there are changes in the given service
|
||||
func TestIndexIndependence(t *testing.T) {
|
||||
func TestStateStore_IndexIndependence(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Querying with no matches gives an empty response
|
||||
|
@ -2860,56 +2860,395 @@ func TestIndexIndependence(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMissingServiceIndex(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
setupFn func(s *Store)
|
||||
svc string
|
||||
wantBeforeResLen int
|
||||
wantBeforeWatchSetSize int
|
||||
updateFn func(s *Store)
|
||||
shouldFire bool
|
||||
wantAfterIndex uint64
|
||||
wantAfterResLen int
|
||||
wantAfterWatchSetSize int
|
||||
}{
|
||||
{
|
||||
name: "not affected by non-connect-enabled target service registration",
|
||||
setupFn: nil,
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterService(t, s, 4, "node1", "test")
|
||||
},
|
||||
shouldFire: false,
|
||||
wantAfterIndex: 4, // No results falls back to global service index
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
},
|
||||
{
|
||||
name: "not affected by non-connect-enabled target service de-registration",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterService(t, s, 4, "node1", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(5, "node1", "test"))
|
||||
},
|
||||
// Note that the old implementation would unblock in this case since it
|
||||
// always watched the target service's index even though some updates
|
||||
// there don't affect Connect result output. This doesn't matter much for
|
||||
// correctness but it causes pointless work.
|
||||
shouldFire: false,
|
||||
wantAfterIndex: 5, // No results falls back to global service index
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
},
|
||||
{
|
||||
name: "unblocks on first connect-native service registration",
|
||||
setupFn: nil,
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 4,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on subsequent connect-native service registration",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 5, "node2", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 5,
|
||||
wantAfterResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on connect-native service de-registration",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "test")
|
||||
testRegisterConnectNativeService(t, s, 5, "node2", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node2", "test"))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on last connect-native service de-registration",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node1", "test"))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
},
|
||||
{
|
||||
name: "unblocks on first proxy service registration",
|
||||
setupFn: nil,
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 4,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on subsequent proxy service registration",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 5, "node2", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 5,
|
||||
wantAfterResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on proxy service de-registration",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
testRegisterSidecarProxy(t, s, 5, "node2", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node2", "test-sidecar-proxy"))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on last proxy service de-registration",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node1", "test-sidecar-proxy"))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
},
|
||||
{
|
||||
name: "unblocks on connect-native service health check change",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "test")
|
||||
testRegisterCheck(t, s, 6, "node1", "test", "check1", "passing")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "test", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on proxy service health check change",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
testRegisterCheck(t, s, 6, "node1", "test-sidecar-proxy", "check1", "passing")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on connect-native node health check change",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "test")
|
||||
testRegisterCheck(t, s, 6, "node1", "", "check1", "passing")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on proxy service health check change",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
testRegisterCheck(t, s, 6, "node1", "", "check1", "passing")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
// See https://github.com/hashicorp/consul/issues/5506. The issue is cause
|
||||
// if the target service exists and is registered meaning it has a
|
||||
// service-specific index. This index is then used for the connect query
|
||||
// even though it is not updated by changes to the actual proxy or it's
|
||||
// checks. If the target service was never registered then it all appears
|
||||
// to work because the code would not find a service index and so fall
|
||||
// back to using the global service index which does change on any update
|
||||
// to proxies.
|
||||
name: "unblocks on proxy service health check change with target service present",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterService(t, s, 4, "node1", "test") // normal service
|
||||
testRegisterSidecarProxy(t, s, 5, "node1", "test")
|
||||
testRegisterCheck(t, s, 6, "node1", "test-sidecar-proxy", "check1", "passing")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
// See https://github.com/hashicorp/consul/issues/5506. This is the edge
|
||||
// case that the simple solution wouldn't catch.
|
||||
name: "unblocks on different service name proxy-service registration when service is present",
|
||||
setupFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
// Register a new result with a different service name could be another
|
||||
// proxy with a different name, but a native instance works too.
|
||||
testRegisterConnectNativeService(t, s, 5, "node2", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 5,
|
||||
wantAfterResLen: 2,
|
||||
// Should take the optimized path where we only watch the teo service
|
||||
// indexes and the connect index iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
}
|
||||
|
||||
// Querying with no matches gives an empty response
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, res, err := s.CheckServiceNodes(ws, "service1")
|
||||
require.Nil(t, err)
|
||||
require.Nil(t, res)
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// index should be 0 for a non existing service at startup
|
||||
require.Equal(t, uint64(0), idx)
|
||||
// Always create 3 nodes
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
testRegisterNode(t, s, 2, "node2")
|
||||
testRegisterNode(t, s, 3, "node3")
|
||||
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
// Setup
|
||||
if tt.setupFn != nil {
|
||||
tt.setupFn(s)
|
||||
}
|
||||
|
||||
// node operations should not affect missing service index
|
||||
ensureServiceVersion(t, s, ws, "service1", 0, 0)
|
||||
require := require.New(t)
|
||||
|
||||
testRegisterService(t, s, 10, "node1", "service1")
|
||||
ensureServiceVersion(t, s, ws, "service1", 10, 1)
|
||||
// Run the query
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, res, err := s.CheckConnectServiceNodes(ws, tt.svc)
|
||||
require.NoError(err)
|
||||
require.Len(res, tt.wantBeforeResLen)
|
||||
require.Len(ws, tt.wantBeforeWatchSetSize)
|
||||
|
||||
s.DeleteService(11, "node1", "service1")
|
||||
// service1 is now missing, its index is now that of the last index a service was
|
||||
// deleted at
|
||||
ensureServiceVersion(t, s, ws, "service1", 11, 0)
|
||||
// Mutate the state store
|
||||
if tt.updateFn != nil {
|
||||
tt.updateFn(s)
|
||||
}
|
||||
|
||||
testRegisterService(t, s, 12, "node1", "service2")
|
||||
ensureServiceVersion(t, s, ws, "service2", 12, 1)
|
||||
fired := watchFired(ws)
|
||||
if tt.shouldFire {
|
||||
require.True(fired, "WatchSet should have fired")
|
||||
} else {
|
||||
require.False(fired, "WatchSet should not have fired")
|
||||
}
|
||||
|
||||
// missing service index does not change even though another service have been
|
||||
// registered
|
||||
ensureServiceVersion(t, s, ws, "service1", 11, 0)
|
||||
ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0)
|
||||
|
||||
// registering a service on another node does not affect missing service
|
||||
// index
|
||||
testRegisterNode(t, s, 13, "node2")
|
||||
testRegisterService(t, s, 14, "node2", "service3")
|
||||
ensureServiceVersion(t, s, ws, "service3", 14, 1)
|
||||
ensureServiceVersion(t, s, ws, "service1", 11, 0)
|
||||
|
||||
// unregistering a service bumps missing service index
|
||||
s.DeleteService(15, "node2", "service3")
|
||||
ensureServiceVersion(t, s, ws, "service3", 15, 0)
|
||||
ensureServiceVersion(t, s, ws, "service2", 12, 1)
|
||||
ensureServiceVersion(t, s, ws, "service1", 15, 0)
|
||||
ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0)
|
||||
|
||||
// registering again a missing service correctly updates its index
|
||||
testRegisterService(t, s, 16, "node1", "service1")
|
||||
ensureServiceVersion(t, s, ws, "service1", 16, 1)
|
||||
// Re-query the same result. Should return the desired index and len
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, res, err = s.CheckConnectServiceNodes(ws, tt.svc)
|
||||
require.NoError(err)
|
||||
require.Len(res, tt.wantAfterResLen)
|
||||
require.Equal(tt.wantAfterIndex, idx)
|
||||
require.Len(ws, tt.wantAfterWatchSetSize)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_CheckServiceNodes(t *testing.T) {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func testUUID() string {
|
||||
|
@ -130,6 +131,32 @@ func testRegisterCheck(t *testing.T, s *Store, idx uint64,
|
|||
}
|
||||
}
|
||||
|
||||
func testRegisterSidecarProxy(t *testing.T, s *Store, idx uint64, nodeID string, targetServiceID string) {
|
||||
svc := &structs.NodeService{
|
||||
ID: targetServiceID + "-sidecar-proxy",
|
||||
Service: targetServiceID + "-sidecar-proxy",
|
||||
Port: 20000,
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: targetServiceID,
|
||||
DestinationServiceID: targetServiceID,
|
||||
},
|
||||
}
|
||||
require.NoError(t, s.EnsureService(idx, nodeID, svc))
|
||||
}
|
||||
|
||||
func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string) {
|
||||
svc := &structs.NodeService{
|
||||
ID: serviceID,
|
||||
Service: serviceID,
|
||||
Port: 1111,
|
||||
Connect: structs.ServiceConnect{
|
||||
Native: true,
|
||||
},
|
||||
}
|
||||
require.NoError(t, s.EnsureService(idx, nodeID, svc))
|
||||
}
|
||||
|
||||
func testSetKey(t *testing.T, s *Store, idx uint64, key, value string) {
|
||||
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
|
||||
if err := s.KVSSet(idx, entry); err != nil {
|
||||
|
|
Loading…
Reference in New Issue