mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 06:44:41 +00:00
[BUGFIX] When a node level check is removed, ensure all services of node are notified
Bugfix for https://github.com/hashicorp/consul/pull/3899 When a node level check is removed (example: maintenance), some watchers on services might have to recompute their state. If those nodes are performing blocking queries, they have to be notified. While their state was updated when node-level state did change or was added this was not the case when the check was removed. This fixes it.
This commit is contained in:
parent
da2d5304cb
commit
a44b9e84b1
@ -1086,6 +1086,43 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This ensure all connected services to a check are updated properly
|
||||||
|
func (s *Store) alterServicesFromCheck(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck, checkServiceExists bool) error {
|
||||||
|
// If the check is associated with a service, check that we have
|
||||||
|
// a registration for the service.
|
||||||
|
if hc.ServiceID != "" {
|
||||||
|
service, err := tx.First("services", "id", hc.Node, hc.ServiceID)
|
||||||
|
// When deleting a service, service might have been removed already
|
||||||
|
if err != nil && checkServiceExists {
|
||||||
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
|
}
|
||||||
|
if service == nil {
|
||||||
|
return ErrMissingService
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy in the service name and tags
|
||||||
|
svc := service.(*structs.ServiceNode)
|
||||||
|
hc.ServiceName = svc.ServiceName
|
||||||
|
hc.ServiceTags = svc.ServiceTags
|
||||||
|
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Update the status for all the services associated with this node
|
||||||
|
services, err := tx.Get("services", "node", hc.Node)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
|
||||||
|
}
|
||||||
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
|
svc := service.(*structs.ServiceNode).ToNodeService()
|
||||||
|
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ensureCheckTransaction is used as the inner method to handle inserting
|
// ensureCheckTransaction is used as the inner method to handle inserting
|
||||||
// a health check into the state store. It ensures safety against inserting
|
// a health check into the state store. It ensures safety against inserting
|
||||||
// checks with no matching node or service.
|
// checks with no matching node or service.
|
||||||
@ -1119,36 +1156,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
|
|||||||
return ErrMissingNode
|
return ErrMissingNode
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the check is associated with a service, check that we have
|
err = s.alterServicesFromCheck(tx, idx, hc, true)
|
||||||
// a registration for the service.
|
|
||||||
if hc.ServiceID != "" {
|
|
||||||
service, err := tx.First("services", "id", hc.Node, hc.ServiceID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return err
|
||||||
}
|
|
||||||
if service == nil {
|
|
||||||
return ErrMissingService
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy in the service name and tags
|
|
||||||
svc := service.(*structs.ServiceNode)
|
|
||||||
hc.ServiceName = svc.ServiceName
|
|
||||||
hc.ServiceTags = svc.ServiceTags
|
|
||||||
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Update the status for all the services associated with this node
|
|
||||||
services, err := tx.Get("services", "node", hc.Node)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
|
|
||||||
}
|
|
||||||
for service := services.Next(); service != nil; service = services.Next() {
|
|
||||||
svc := service.(*structs.ServiceNode).ToNodeService()
|
|
||||||
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
|
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete any sessions for this check if the health is critical.
|
// Delete any sessions for this check if the health is critical.
|
||||||
@ -1390,19 +1400,10 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
existing := hc.(*structs.HealthCheck)
|
existing := hc.(*structs.HealthCheck)
|
||||||
if existing != nil && existing.ServiceID != "" {
|
if existing != nil {
|
||||||
service, err := tx.First("services", "id", node, existing.ServiceID)
|
err = s.alterServicesFromCheck(tx, idx, existing, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed service lookup: %s", err)
|
return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err)
|
||||||
}
|
|
||||||
if service == nil {
|
|
||||||
return ErrMissingService
|
|
||||||
}
|
|
||||||
|
|
||||||
// Updated index of service
|
|
||||||
svc := service.(*structs.ServiceNode)
|
|
||||||
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2119,6 +2119,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
|||||||
// Register a node and a node-level health check.
|
// Register a node and a node-level health check.
|
||||||
testRegisterNode(t, s, 1, "node1")
|
testRegisterNode(t, s, 1, "node1")
|
||||||
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
|
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
|
||||||
|
testRegisterService(t, s, 2, "node1", "service1")
|
||||||
|
|
||||||
// Make sure the check is there.
|
// Make sure the check is there.
|
||||||
ws := memdb.NewWatchSet()
|
ws := memdb.NewWatchSet()
|
||||||
@ -2130,6 +2131,8 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
|||||||
t.Fatalf("bad: %#v", checks)
|
t.Fatalf("bad: %#v", checks)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 2, 1)
|
||||||
|
|
||||||
// Delete the check.
|
// Delete the check.
|
||||||
if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
|
if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
@ -2137,6 +2140,8 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
|||||||
if !watchFired(ws) {
|
if !watchFired(ws) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
// All services linked to this node should have their index updated
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 3, 1)
|
||||||
|
|
||||||
// Check is gone
|
// Check is gone
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user