mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 06:16:08 +00:00
Optimize health watching to single chan/goroutine. (#5449)
Refs #4984. Watching chans for every node we touch in a health query is wasteful. In #4984 it shows that if there are more than 682 service instances we always fallback to watching all services which kills performance. We already have a record in MemDB that is reliably update whenever the service health result should change thanks to per-service watch indexes. So in general, provided there is at least one service instances and we actually have a service index for it (we always do now) we only ever need to watch a single channel. This saves us from ever falling back to the general index and causing the performance cliff in #4984, but it also means fewer goroutines and work done for every blocking health query. It also saves some allocations made during the query because we no longer have to populate a WatchSet with 3 chans per service instance which saves the internal map allocation. This passes all state store tests except the one that explicitly checked for the fallback behaviour we've now optimized away and in general seems safe.
This commit is contained in:
parent
6458fdd876
commit
0b5a078b95
@ -955,22 +955,43 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
|
||||
// * block until an instance for this service is available, or another
|
||||
// service is unregistered.
|
||||
func maxIndexForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) uint64 {
|
||||
idx, _ := maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks)
|
||||
return idx
|
||||
}
|
||||
|
||||
// maxIndexAndWatchChForService return the maximum Raft Index for a service. If
|
||||
// the index is not set for the service, it will return the missing service
|
||||
// index. The service_last_extinction is set to the last raft index when a
|
||||
// service was unregistered (or 0 if no services were ever unregistered). This
|
||||
// allows blocking queries to
|
||||
// * return when the last instance of a service is removed
|
||||
// * block until an instance for this service is available, or another
|
||||
// service is unregistered.
|
||||
//
|
||||
// It also _may_ return a watch chan to add to a WatchSet. It will only return
|
||||
// one if the service exists, and has a service index. If it doesn't then nil is
|
||||
// returned for the chan. This allows for blocking watchers to _only_ watch this
|
||||
// one chan in the common case, falling back to watching all touched MemDB
|
||||
// indexes in more complicated cases.
|
||||
func maxIndexAndWatchChForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) (uint64, <-chan struct{}) {
|
||||
if !serviceExists {
|
||||
res, err := tx.First("index", "id", serviceLastExtinctionIndexName)
|
||||
if missingIdx, ok := res.(*IndexEntry); ok && err == nil {
|
||||
return missingIdx.Value
|
||||
// Not safe to only watch the extinction index as it's not updated when
|
||||
// new instances come along so return nil watchCh.
|
||||
return missingIdx.Value, nil
|
||||
}
|
||||
}
|
||||
|
||||
res, err := tx.First("index", "id", serviceIndexName(serviceName))
|
||||
ch, res, err := tx.FirstWatch("index", "id", serviceIndexName(serviceName))
|
||||
if idx, ok := res.(*IndexEntry); ok && err == nil {
|
||||
return idx.Value
|
||||
return idx.Value, ch
|
||||
}
|
||||
if checks {
|
||||
return maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
return maxIndexTxn(tx, "nodes", "services", "checks"), nil
|
||||
}
|
||||
|
||||
return maxIndexTxn(tx, "nodes", "services")
|
||||
return maxIndexTxn(tx, "nodes", "services"), nil
|
||||
}
|
||||
|
||||
// ConnectServiceNodes returns the nodes associated with a Connect
|
||||
@ -1870,7 +1891,8 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
ws.Add(iter.WatchCh())
|
||||
// Note we decide if we want to watch this iterator or not down below. We need
|
||||
// to see if it returned anything first.
|
||||
|
||||
// Return the results.
|
||||
var results structs.ServiceNodes
|
||||
@ -1879,9 +1901,31 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
||||
}
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexForService(tx, serviceName, len(results) > 0, true)
|
||||
idx, ch := maxIndexAndWatchChForService(tx, serviceName, len(results) > 0, true)
|
||||
|
||||
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
|
||||
// Create a nil watchset to pass below, we'll only pass the real one if we
|
||||
// need to. Nil watchers are safe/allowed and saves some allocation too.
|
||||
var fallbackWS memdb.WatchSet
|
||||
if ch == nil {
|
||||
// There was no explicit channel returned that corresponds to the service
|
||||
// index. That means we need to fallback to watching everything we touch in
|
||||
// the DB as normal. We plumb the caller's watchset through (note it's a map
|
||||
// so this is a by-reference assignment.)
|
||||
fallbackWS = ws
|
||||
// We also need to watch the iterator from earlier too.
|
||||
fallbackWS.Add(iter.WatchCh())
|
||||
} else {
|
||||
// There was a valid service index, and non-empty result. In this case it is
|
||||
// sufficient just to watch the service index's chan since that _must_ be
|
||||
// written to if the result of this method is going to change. This saves us
|
||||
// watching potentially thousands of watch chans for large services which
|
||||
// may need many goroutines. It also avoid the performance cliff that is hit
|
||||
// when watchLimit is hit (~682 service instances). See
|
||||
// https://github.com/hashicorp/consul/issues/4984
|
||||
ws.Add(ch)
|
||||
}
|
||||
|
||||
return s.parseCheckServiceNodes(tx, fallbackWS, idx, serviceName, results, err)
|
||||
}
|
||||
|
||||
// CheckServiceTagNodes is used to query all nodes and checks for a given
|
||||
|
@ -3021,43 +3021,11 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Overwhelm node and check tracking.
|
||||
idx = 13
|
||||
for i := 0; i < 2*watchLimit; i++ {
|
||||
node := fmt.Sprintf("many%d", i)
|
||||
testRegisterNode(t, s, idx, node)
|
||||
idx++
|
||||
testRegisterCheck(t, s, idx, node, "", "check1", api.HealthPassing)
|
||||
idx++
|
||||
testRegisterService(t, s, idx, node, "service1")
|
||||
idx++
|
||||
testRegisterCheck(t, s, idx, node, "service1", "check2", api.HealthPassing)
|
||||
idx++
|
||||
}
|
||||
|
||||
// Now registering an unrelated node will fire the watch.
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, results, err = s.CheckServiceNodes(ws, "service1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
testRegisterNode(t, s, idx, "more-nope")
|
||||
idx++
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Also, registering an unrelated check will fire the watch.
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, results, err = s.CheckServiceNodes(ws, "service1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
testRegisterCheck(t, s, idx, "more-nope", "", "check1", api.HealthPassing)
|
||||
idx++
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
// Note that we can't overwhelm chan tracking any more since we optimized it
|
||||
// to only need to watch one chan in the happy path. The only path that does
|
||||
// bees to watch more stuff is where there are no service instances which also
|
||||
// means fewer than watchLimit chans too so effectively no way to trigger
|
||||
// Fallback watch any more.
|
||||
}
|
||||
|
||||
func TestStateStore_CheckConnectServiceNodes(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user