mirror of https://github.com/status-im/consul.git
Merge pull request #3899 from pierresouchay/fix_blocking_queries_index
Services Indexes modified per service instead of using a global Index
This commit is contained in:
commit
9a47449c6d
|
@ -517,7 +517,11 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
|
|||
}
|
||||
var sids []string
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
sids = append(sids, service.(*structs.ServiceNode).ServiceID)
|
||||
svc := service.(*structs.ServiceNode)
|
||||
sids = append(sids, svc.ServiceID)
|
||||
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Do the delete in a separate loop so we don't trash the iterator.
|
||||
|
@ -638,6 +642,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
|
|||
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -752,14 +759,30 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
|
|||
return idx, results, nil
|
||||
}
|
||||
|
||||
// maxIndexForService return the maximum Raft Index for a service
|
||||
// If the index is not set for the service, it will return:
|
||||
// - maxIndex(nodes, services) if checks is false
|
||||
// - maxIndex(nodes, services, checks) if checks is true
|
||||
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
|
||||
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
|
||||
if err == nil {
|
||||
if idx, ok := transaction.(*IndexEntry); ok {
|
||||
return idx.Value
|
||||
}
|
||||
}
|
||||
if checks {
|
||||
return maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
}
|
||||
return maxIndexTxn(tx, "nodes", "services")
|
||||
}
|
||||
|
||||
// ServiceNodes returns the nodes associated with a given service name.
|
||||
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "nodes", "services")
|
||||
|
||||
idx := maxIndexForService(tx, serviceName, false)
|
||||
// List all the services.
|
||||
services, err := tx.Get("services", "service", serviceName)
|
||||
if err != nil {
|
||||
|
@ -787,7 +810,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "nodes", "services")
|
||||
idx := maxIndexForService(tx, service, false)
|
||||
|
||||
// List all the services.
|
||||
services, err := tx.Get("services", "service", service)
|
||||
|
@ -979,6 +1002,10 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func serviceIndexName(name string) string {
|
||||
return fmt.Sprintf("service.%s", name)
|
||||
}
|
||||
|
||||
// deleteServiceTxn is the inner method called to remove a service
|
||||
// registration within an existing transaction.
|
||||
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
|
||||
|
@ -1022,6 +1049,26 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
|
|||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
svc := service.(*structs.ServiceNode)
|
||||
if remainingService, err := tx.First("services", "service", svc.ServiceName); err == nil {
|
||||
if remainingService != nil {
|
||||
// We have at least one remaining service, update the index
|
||||
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
} else {
|
||||
// There are no more service instances, cleanup the service.<serviceName> index
|
||||
serviceIndex, err := tx.First("index", "id", serviceIndexName(svc.ServiceName))
|
||||
if err == nil && serviceIndex != nil {
|
||||
// we found service.<serviceName> index, garbage collect it
|
||||
if errW := tx.Delete("index", serviceIndex); errW != nil {
|
||||
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1087,6 +1134,21 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
|
|||
svc := service.(*structs.ServiceNode)
|
||||
hc.ServiceName = svc.ServiceName
|
||||
hc.ServiceTags = svc.ServiceTags
|
||||
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
} else {
|
||||
// Update the status for all the services associated with this node
|
||||
services, err := tx.Get("services", "node", hc.Node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
|
||||
}
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
svc := service.(*structs.ServiceNode).ToNodeService()
|
||||
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete any sessions for this check if the health is critical.
|
||||
|
@ -1199,8 +1261,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "nodes", "checks")
|
||||
|
||||
idx := maxIndexForService(tx, serviceName, true)
|
||||
// Return the checks.
|
||||
iter, err := tx.Get("checks", "service", serviceName)
|
||||
if err != nil {
|
||||
|
@ -1328,6 +1389,22 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
|
|||
if hc == nil {
|
||||
return nil
|
||||
}
|
||||
existing := hc.(*structs.HealthCheck)
|
||||
if existing != nil && existing.ServiceID != "" {
|
||||
service, err := tx.First("services", "id", node, existing.ServiceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
if service == nil {
|
||||
return ErrMissingService
|
||||
}
|
||||
|
||||
// Updated index of service
|
||||
svc := service.(*structs.ServiceNode)
|
||||
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the check from the DB and update the index.
|
||||
if err := tx.Delete("checks", hc); err != nil {
|
||||
|
@ -1363,7 +1440,7 @@ func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
idx := maxIndexForService(tx, serviceName, true)
|
||||
|
||||
// Query the state store for the service.
|
||||
iter, err := tx.Get("services", "service", serviceName)
|
||||
|
@ -1387,7 +1464,7 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string)
|
|||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, "nodes", "services", "checks")
|
||||
idx := maxIndexForService(tx, serviceName, true)
|
||||
|
||||
// Query the state store for the service.
|
||||
iter, err := tx.Get("services", "service", serviceName)
|
||||
|
|
|
@ -2166,6 +2166,126 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) {
|
||||
idx, services, err := s.ServiceNodes(ws, serviceID)
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != expectedIdx {
|
||||
t.Fatalf("bad: %d, expected %d", idx, expectedIdx)
|
||||
}
|
||||
if len(services) != expectedSize {
|
||||
t.Fatalf("expected size: %d, but was %d", expectedSize, len(services))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure index exist, if expectedIndex = -1, ensure the index does not exists
|
||||
func ensureIndexForService(t *testing.T, s *Store, ws memdb.WatchSet, serviceName string, expectedIndex uint64) {
|
||||
t.Helper()
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
transaction, err := tx.First("index", "id", fmt.Sprintf("service.%s", serviceName))
|
||||
if err == nil {
|
||||
if idx, ok := transaction.(*IndexEntry); ok {
|
||||
if expectedIndex != idx.Value {
|
||||
t.Fatalf("Expected index %d, but had %d for %s", expectedIndex, idx.Value, serviceName)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
if expectedIndex != 0 {
|
||||
t.Fatalf("Index for %s was expected but not found", serviceName)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIndexIndependance test that changes on a given service does not impact the
|
||||
// index of other services. It allows to have huge benefits for watches since
|
||||
// watchers are notified ONLY when there are changes in the given service
|
||||
func TestIndexIndependance(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Querying with no matches gives an empty response
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, res, err := s.CheckServiceNodes(ws, "service1")
|
||||
if idx != 0 || res != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||
}
|
||||
|
||||
// Register some nodes.
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
testRegisterNode(t, s, 1, "node2")
|
||||
|
||||
// Register node-level checks. These should be the final result.
|
||||
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
|
||||
testRegisterCheck(t, s, 3, "node2", "", "check2", api.HealthPassing)
|
||||
|
||||
// Register a service against the nodes.
|
||||
testRegisterService(t, s, 4, "node1", "service1")
|
||||
testRegisterService(t, s, 5, "node2", "service2")
|
||||
ensureServiceVersion(t, s, ws, "service2", 5, 1)
|
||||
|
||||
// Register checks against the services.
|
||||
testRegisterCheck(t, s, 6, "node1", "service1", "check3", api.HealthPassing)
|
||||
testRegisterCheck(t, s, 7, "node2", "service2", "check4", api.HealthPassing)
|
||||
// Index must be updated when checks are updated
|
||||
ensureServiceVersion(t, s, ws, "service1", 6, 1)
|
||||
ensureServiceVersion(t, s, ws, "service2", 7, 1)
|
||||
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
// We ensure the idx for service2 has not been changed
|
||||
testRegisterCheck(t, s, 8, "node2", "service2", "check4", api.HealthWarning)
|
||||
ensureServiceVersion(t, s, ws, "service2", 8, 1)
|
||||
testRegisterCheck(t, s, 9, "node2", "service2", "check4", api.HealthPassing)
|
||||
ensureServiceVersion(t, s, ws, "service2", 9, 1)
|
||||
|
||||
// Add a new check on node1, while not on service, it should impact
|
||||
// indexes of all services running on node1, aka service1
|
||||
testRegisterCheck(t, s, 10, "node1", "", "check_node", api.HealthPassing)
|
||||
|
||||
// Service2 should not be modified
|
||||
ensureServiceVersion(t, s, ws, "service2", 9, 1)
|
||||
// Service1 should be modified
|
||||
ensureServiceVersion(t, s, ws, "service1", 10, 1)
|
||||
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
testRegisterService(t, s, 11, "node1", "service_shared")
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 11, 1)
|
||||
testRegisterService(t, s, 12, "node2", "service_shared")
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 12, 2)
|
||||
|
||||
testRegisterCheck(t, s, 13, "node2", "service_shared", "check_service_shared", api.HealthCritical)
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 13, 2)
|
||||
testRegisterCheck(t, s, 14, "node2", "service_shared", "check_service_shared", api.HealthPassing)
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 14, 2)
|
||||
|
||||
s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
|
||||
ensureIndexForService(t, s, ws, "service_shared", 15)
|
||||
s.DeleteService(16, "node2", "service_shared")
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
|
||||
ensureIndexForService(t, s, ws, "service_shared", 16)
|
||||
s.DeleteService(17, "node1", "service_shared")
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
|
||||
|
||||
testRegisterService(t, s, 18, "node1", "service_new")
|
||||
// Since service does not exists anymore, its index should be last insert
|
||||
// The behaviour is the same as all non-existing services, meaning
|
||||
// we properly did collect garbage
|
||||
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
|
||||
// No index should exist anymore, it must have been garbage collected
|
||||
ensureIndexForService(t, s, ws, "service_shared", 0)
|
||||
if !watchFired(ws) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestStateStore_CheckServiceNodes(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
@ -2197,14 +2317,19 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
|
|||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// We ensure the idx for service2 has not been changed
|
||||
ensureServiceVersion(t, s, ws, "service2", 7, 1)
|
||||
|
||||
// Query the state store for nodes and checks which have been registered
|
||||
// with a specific service.
|
||||
ws = memdb.NewWatchSet()
|
||||
ensureServiceVersion(t, s, ws, "service1", 6, 1)
|
||||
idx, results, err := s.CheckServiceNodes(ws, "service1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 7 {
|
||||
// registered with ensureServiceVersion(t, s, ws, "service1", 6, 1)
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
@ -2229,7 +2354,8 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 8 {
|
||||
// service1 has been registered at idx=6, other different registrations do not count
|
||||
if idx != 6 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,26 @@ Consul is A, and version B is released.
|
|||
by running `consul members` to make sure all members have the latest
|
||||
build and highest protocol version.
|
||||
|
||||
## Upgrade from Version 1.0.6 to higher
|
||||
|
||||
In version 1.0.7 and higher, when requesting a specific service
|
||||
(`/v1/health/:service` or `/v1/catalog/:service` endpoints), the
|
||||
`X-Consul-Index` returned is now the index at which that specific service was
|
||||
last modified.
|
||||
In version 1.0.6 and earlier the X-Consul-Index returned was the index at
|
||||
which any service was last modified. See
|
||||
[GH-3890](https://github.com/hashicorp/consul/issues/3890) for more details.
|
||||
|
||||
During upgrades from 1.0.6 or lower to 1.0.7 or higher, watchers are likely to
|
||||
see `X-Consul-Index` for these endpoints decrease between blocking calls.
|
||||
|
||||
Consul’s watch feature and consul-template should gracefully handle this case.
|
||||
Other tools relying on blocking service or health queries are also likely to
|
||||
work; some may require a restart. It is possible external tools could break and
|
||||
either stop working or continually re-request data without blocking if they
|
||||
have assumed indexes can never decrease or be reset and/or persist index
|
||||
values. Please test any blocking query integrations in a controlled environment
|
||||
before proceeding.
|
||||
|
||||
## Backward Incompatible Upgrades
|
||||
|
||||
|
|
Loading…
Reference in New Issue