mirror of https://github.com/status-im/consul.git
Improve blocking queries on services that do not exist (#4810)
## Background When making a blocking query on a missing service (was never registered, or is not registered anymore) the query returns as soon as any service is updated. On clusters with frequent updates (5~10 updates/s in our DCs) these queries virtually do not block, and clients with no protections againt this waste ressources on the agent and server side. Clients that do protect against this get updates later than they should because of the backoff time they implement between requests. ## Implementation While reducing the number of unnecessary updates we still want : * Clients to be notified as soon as when the last instance of a service disapears. * Clients to be notified whenever there's there is an update for the service. * Clients to be notified as soon as the first instance of the requested service is added. To reduce the number of unnecessary updates we need to block when a request to a missing service is made. However in the following case : 1. Client `client1` makes a query for service `foo`, gets back a node and X-Consul-Index 42 2. `foo` is unregistered 3. `client1` makes a query for `foo` with `index=42` -> `foo` does not exist, the query blocks and `client1` is not notified of the change on `foo` We could store the last raft index when each service was last alive to know wether we should block on the incoming query or not, but that list could grow indefinetly. We instead store the last raft index when a service was unregistered and use it when a query targets a service that does not exist. When a service `srv` is unregistered this "missing service index" is always greater than any X-Consul-Index held by the clients while `srv` was up, allowing us to immediatly notify them. 1. Client `client1` makes a query for service `foo`, gets back a node and `X-Consul-Index: 42` 2. `foo` is unregistered, we set the "missing service index" to 43 3. `client1` makes a blocking query for `foo` with `index=42` -> `foo` does not exist, we check against the "missing service index" and return immediatly with `X-Consul-Index: 43` 4. `client1` makes a blocking query for `foo` with `index=43` -> we block 5. Other changes happen in the cluster, but foo still doesn't exist and "missing service index" hasn't changed, the query is still blocked 6. `foo` is registered again on index 62 -> `foo` exists and its index is greater than 43, we unblock the query
This commit is contained in:
parent
4db60f8243
commit
4afbe792df
|
@ -13,6 +13,10 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
servicesTableName = "services"
|
servicesTableName = "services"
|
||||||
|
|
||||||
|
// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
|
||||||
|
// of any service was unregistered. This is used by blocking queries on missing services.
|
||||||
|
serviceLastExtinctionIndexName = "service_last_extinction"
|
||||||
)
|
)
|
||||||
|
|
||||||
// nodesTableSchema returns a new table schema used for storing node
|
// nodesTableSchema returns a new table schema used for storing node
|
||||||
|
@ -841,19 +845,30 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// maxIndexForService return the maximum Raft Index for a service
|
// maxIndexForService return the maximum Raft Index for a service
|
||||||
// If the index is not set for the service, it will return:
|
// If the index is not set for the service, it will return the missing
|
||||||
// - maxIndex(nodes, services) if checks is false
|
// service index.
|
||||||
// - maxIndex(nodes, services, checks) if checks is true
|
// The service_last_extinction is set to the last raft index when a service
|
||||||
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
|
// was unregistered (or 0 if no services were ever unregistered). This
|
||||||
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
|
// allows blocking queries to
|
||||||
if err == nil {
|
// * return when the last instance of a service is removed
|
||||||
if idx, ok := transaction.(*IndexEntry); ok {
|
// * block until an instance for this service is available, or another
|
||||||
return idx.Value
|
// service is unregistered.
|
||||||
|
func maxIndexForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) uint64 {
|
||||||
|
if !serviceExists {
|
||||||
|
res, err := tx.First("index", "id", serviceLastExtinctionIndexName)
|
||||||
|
if missingIdx, ok := res.(*IndexEntry); ok && err == nil {
|
||||||
|
return missingIdx.Value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res, err := tx.First("index", "id", serviceIndexName(serviceName))
|
||||||
|
if idx, ok := res.(*IndexEntry); ok && err == nil {
|
||||||
|
return idx.Value
|
||||||
|
}
|
||||||
if checks {
|
if checks {
|
||||||
return maxIndexTxn(tx, "nodes", "services", "checks")
|
return maxIndexTxn(tx, "nodes", "services", "checks")
|
||||||
}
|
}
|
||||||
|
|
||||||
return maxIndexTxn(tx, "nodes", "services")
|
return maxIndexTxn(tx, "nodes", "services")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -873,9 +888,6 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
|
||||||
idx := maxIndexForService(tx, serviceName, false)
|
|
||||||
|
|
||||||
// Function for lookup
|
// Function for lookup
|
||||||
var f func() (memdb.ResultIterator, error)
|
var f func() (memdb.ResultIterator, error)
|
||||||
if !connect {
|
if !connect {
|
||||||
|
@ -905,6 +917,10 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexForService(tx, serviceName, len(results) > 0, false)
|
||||||
|
|
||||||
return idx, results, nil
|
return idx, results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -914,9 +930,6 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
|
||||||
idx := maxIndexForService(tx, service, false)
|
|
||||||
|
|
||||||
// List all the services.
|
// List all the services.
|
||||||
services, err := tx.Get("services", "service", service)
|
services, err := tx.Get("services", "service", service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -925,9 +938,11 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
|
||||||
ws.Add(services.WatchCh())
|
ws.Add(services.WatchCh())
|
||||||
|
|
||||||
// Gather all the services and apply the tag filter.
|
// Gather all the services and apply the tag filter.
|
||||||
|
serviceExists := false
|
||||||
var results structs.ServiceNodes
|
var results structs.ServiceNodes
|
||||||
for service := services.Next(); service != nil; service = services.Next() {
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
svc := service.(*structs.ServiceNode)
|
svc := service.(*structs.ServiceNode)
|
||||||
|
serviceExists = true
|
||||||
if !serviceTagsFilter(svc, tags) {
|
if !serviceTagsFilter(svc, tags) {
|
||||||
results = append(results, svc)
|
results = append(results, svc)
|
||||||
}
|
}
|
||||||
|
@ -938,6 +953,9 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
||||||
}
|
}
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexForService(tx, service, serviceExists, false)
|
||||||
|
|
||||||
return idx, results, nil
|
return idx, results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1214,6 +1232,11 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
|
||||||
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating missing service index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
||||||
|
@ -1438,7 +1461,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := maxIndexForService(tx, serviceName, true)
|
idx := maxIndexForService(tx, serviceName, true, true)
|
||||||
// Return the checks.
|
// Return the checks.
|
||||||
iter, err := tx.Get("checks", "service", serviceName)
|
iter, err := tx.Get("checks", "service", serviceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1627,9 +1650,6 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
|
||||||
idx := maxIndexForService(tx, serviceName, true)
|
|
||||||
|
|
||||||
// Function for lookup
|
// Function for lookup
|
||||||
var f func() (memdb.ResultIterator, error)
|
var f func() (memdb.ResultIterator, error)
|
||||||
if !connect {
|
if !connect {
|
||||||
|
@ -1654,6 +1674,10 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
||||||
for service := iter.Next(); service != nil; service = iter.Next() {
|
for service := iter.Next(); service != nil; service = iter.Next() {
|
||||||
results = append(results, service.(*structs.ServiceNode))
|
results = append(results, service.(*structs.ServiceNode))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexForService(tx, serviceName, len(results) > 0, true)
|
||||||
|
|
||||||
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
|
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1663,9 +1687,6 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
|
||||||
idx := maxIndexForService(tx, serviceName, true)
|
|
||||||
|
|
||||||
// Query the state store for the service.
|
// Query the state store for the service.
|
||||||
iter, err := tx.Get("services", "service", serviceName)
|
iter, err := tx.Get("services", "service", serviceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1674,13 +1695,18 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
|
||||||
ws.Add(iter.WatchCh())
|
ws.Add(iter.WatchCh())
|
||||||
|
|
||||||
// Return the results, filtering by tag.
|
// Return the results, filtering by tag.
|
||||||
|
serviceExists := false
|
||||||
var results structs.ServiceNodes
|
var results structs.ServiceNodes
|
||||||
for service := iter.Next(); service != nil; service = iter.Next() {
|
for service := iter.Next(); service != nil; service = iter.Next() {
|
||||||
svc := service.(*structs.ServiceNode)
|
svc := service.(*structs.ServiceNode)
|
||||||
|
serviceExists = true
|
||||||
if !serviceTagsFilter(svc, tags) {
|
if !serviceTagsFilter(svc, tags) {
|
||||||
results = append(results, svc)
|
results = append(results, svc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexForService(tx, serviceName, serviceExists, true)
|
||||||
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
|
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2848,16 +2848,68 @@ func TestIndexIndependence(t *testing.T) {
|
||||||
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
|
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
|
||||||
|
|
||||||
testRegisterService(t, s, 18, "node1", "service_new")
|
testRegisterService(t, s, 18, "node1", "service_new")
|
||||||
// Since service does not exists anymore, its index should be last insert
|
|
||||||
// The behaviour is the same as all non-existing services, meaning
|
// Since service does not exists anymore, its index should be that of
|
||||||
// we properly did collect garbage
|
// the last deleted service
|
||||||
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
|
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
|
||||||
|
|
||||||
// No index should exist anymore, it must have been garbage collected
|
// No index should exist anymore, it must have been garbage collected
|
||||||
ensureIndexForService(t, s, ws, "service_shared", 0)
|
ensureIndexForService(t, s, ws, "service_shared", 0)
|
||||||
if !watchFired(ws) {
|
if !watchFired(ws) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMissingServiceIndex(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Querying with no matches gives an empty response
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
idx, res, err := s.CheckServiceNodes(ws, "service1")
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Nil(t, res)
|
||||||
|
|
||||||
|
// index should be 0 for a non existing service at startup
|
||||||
|
require.Equal(t, uint64(0), idx)
|
||||||
|
|
||||||
|
testRegisterNode(t, s, 0, "node1")
|
||||||
|
|
||||||
|
// node operations should not affect missing service index
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 0, 0)
|
||||||
|
|
||||||
|
testRegisterService(t, s, 10, "node1", "service1")
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 10, 1)
|
||||||
|
|
||||||
|
s.DeleteService(11, "node1", "service1")
|
||||||
|
// service1 is now missing, its index is now that of the last index a service was
|
||||||
|
// deleted at
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 11, 0)
|
||||||
|
|
||||||
|
testRegisterService(t, s, 12, "node1", "service2")
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 12, 1)
|
||||||
|
|
||||||
|
// missing service index does not change even though another service have been
|
||||||
|
// registered
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 11, 0)
|
||||||
|
ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0)
|
||||||
|
|
||||||
|
// registering a service on another node does not affect missing service
|
||||||
|
// index
|
||||||
|
testRegisterNode(t, s, 13, "node2")
|
||||||
|
testRegisterService(t, s, 14, "node2", "service3")
|
||||||
|
ensureServiceVersion(t, s, ws, "service3", 14, 1)
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 11, 0)
|
||||||
|
|
||||||
|
// unregistering a service bumps missing service index
|
||||||
|
s.DeleteService(15, "node2", "service3")
|
||||||
|
ensureServiceVersion(t, s, ws, "service3", 15, 0)
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 12, 1)
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 15, 0)
|
||||||
|
ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0)
|
||||||
|
|
||||||
|
// registering again a missing service correctly updates its index
|
||||||
|
testRegisterService(t, s, 16, "node1", "service1")
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 16, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_CheckServiceNodes(t *testing.T) {
|
func TestStateStore_CheckServiceNodes(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue