mirror of https://github.com/status-im/consul.git
Services Indexes modified per service instead of using a global Index
This patch improves the watches for services on large cluster: each service has now its own index, such watches on a specific service are not modified by changes in the global catalog. It should improve a lot the performance of tools such as consul-template or libraries performing watches on very large clusters with many services/watches.
This commit is contained in:
parent
1ce90e2a19
commit
73127ef407
|
@ -517,7 +517,11 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
|
||||||
}
|
}
|
||||||
var sids []string
|
var sids []string
|
||||||
for service := services.Next(); service != nil; service = services.Next() {
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
sids = append(sids, service.(*structs.ServiceNode).ServiceID)
|
svc := service.(*structs.ServiceNode)
|
||||||
|
sids = append(sids, svc.ServiceID)
|
||||||
|
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do the delete in a separate loop so we don't trash the iterator.
|
// Do the delete in a separate loop so we don't trash the iterator.
|
||||||
|
@ -638,6 +642,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
|
||||||
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
|
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
|
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -752,14 +759,29 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
|
||||||
return idx, results, nil
|
return idx, results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// maxIndexForService return the maximum transaction number for a service
|
||||||
|
// If the transaction is not set for the service, it will return the max
|
||||||
|
// transaction number of "nodes", "services"
|
||||||
|
func maxIndexForService(tx *memdb.Txn, serviceName string) (uint64, error) {
|
||||||
|
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
|
||||||
|
if err == nil {
|
||||||
|
if idx, ok := transaction.(*IndexEntry); ok {
|
||||||
|
return idx.Value, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return maxIndexTxn(tx, "nodes", "services"), nil
|
||||||
|
}
|
||||||
|
|
||||||
// ServiceNodes returns the nodes associated with a given service name.
|
// ServiceNodes returns the nodes associated with a given service name.
|
||||||
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := maxIndexTxn(tx, "nodes", "services")
|
idx, err := maxIndexForService(tx, serviceName)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
|
||||||
|
}
|
||||||
// List all the services.
|
// List all the services.
|
||||||
services, err := tx.Get("services", "service", serviceName)
|
services, err := tx.Get("services", "service", serviceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -787,7 +809,10 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := maxIndexTxn(tx, "nodes", "services")
|
idx, err := maxIndexForService(tx, service)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err))
|
||||||
|
}
|
||||||
|
|
||||||
// List all the services.
|
// List all the services.
|
||||||
services, err := tx.Get("services", "service", service)
|
services, err := tx.Get("services", "service", service)
|
||||||
|
@ -979,6 +1004,10 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func serviceIndexName(name string) string {
|
||||||
|
return fmt.Sprintf("service.%s", name)
|
||||||
|
}
|
||||||
|
|
||||||
// deleteServiceTxn is the inner method called to remove a service
|
// deleteServiceTxn is the inner method called to remove a service
|
||||||
// registration within an existing transaction.
|
// registration within an existing transaction.
|
||||||
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
|
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
|
||||||
|
@ -1022,6 +1051,26 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
|
||||||
return fmt.Errorf("failed updating index: %s", err)
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
svc := service.(*structs.ServiceNode)
|
||||||
|
if remainingServicesItr, err := tx.Get("services", "service", svc.ServiceName); err == nil {
|
||||||
|
if remainingServicesItr != nil && remainingServicesItr.Next() != nil {
|
||||||
|
// We have at least one remaining service, update the index
|
||||||
|
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// There are no more service instances, cleanup the service.<serviceName> index
|
||||||
|
serviceIndex, err := tx.First("index", "id", serviceIndexName(svc.ServiceName))
|
||||||
|
if err == nil && serviceIndex != nil {
|
||||||
|
// we found service.<serviceName> index, garbage collect it
|
||||||
|
if errW := tx.Delete("index", serviceIndex); errW != nil {
|
||||||
|
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1087,6 +1136,21 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
|
||||||
svc := service.(*structs.ServiceNode)
|
svc := service.(*structs.ServiceNode)
|
||||||
hc.ServiceName = svc.ServiceName
|
hc.ServiceName = svc.ServiceName
|
||||||
hc.ServiceTags = svc.ServiceTags
|
hc.ServiceTags = svc.ServiceTags
|
||||||
|
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Update the status for all the services associated with this node
|
||||||
|
services, err := tx.Get("services", "node", hc.Node)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
|
||||||
|
}
|
||||||
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
|
svc := service.(*structs.ServiceNode).ToNodeService()
|
||||||
|
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete any sessions for this check if the health is critical.
|
// Delete any sessions for this check if the health is critical.
|
||||||
|
@ -1328,6 +1392,22 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
|
||||||
if hc == nil {
|
if hc == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
existing := hc.(*structs.HealthCheck)
|
||||||
|
if existing != nil && existing.ServiceID != "" {
|
||||||
|
service, err := tx.First("services", "id", node, existing.ServiceID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed service lookup: %s", err)
|
||||||
|
}
|
||||||
|
if service == nil {
|
||||||
|
return ErrMissingService
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updated index of service
|
||||||
|
svc := service.(*structs.ServiceNode)
|
||||||
|
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Delete the check from the DB and update the index.
|
// Delete the check from the DB and update the index.
|
||||||
if err := tx.Delete("checks", hc); err != nil {
|
if err := tx.Delete("checks", hc); err != nil {
|
||||||
|
|
|
@ -2166,6 +2166,101 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) {
|
||||||
|
idx, services, err := s.ServiceNodes(ws, serviceID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != expectedIdx {
|
||||||
|
t.Fatalf("bad: %d, expected %d", idx, expectedIdx)
|
||||||
|
}
|
||||||
|
if len(services) != expectedSize {
|
||||||
|
t.Fatalf("expected size: %d, but was %d", expectedSize, len(services))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestIndexIndependance test that changes on a given service does not impact the
|
||||||
|
// index of other services. It allows to have huge benefits for watches since
|
||||||
|
// watchers are notified ONLY when there are changes in the given service
|
||||||
|
func TestIndexIndependance(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Querying with no matches gives an empty response
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
idx, res, err := s.CheckServiceNodes(ws, "service1")
|
||||||
|
if idx != 0 || res != nil || err != nil {
|
||||||
|
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register some nodes.
|
||||||
|
testRegisterNode(t, s, 0, "node1")
|
||||||
|
testRegisterNode(t, s, 1, "node2")
|
||||||
|
|
||||||
|
// Register node-level checks. These should be the final result.
|
||||||
|
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
|
||||||
|
testRegisterCheck(t, s, 3, "node2", "", "check2", api.HealthPassing)
|
||||||
|
|
||||||
|
// Register a service against the nodes.
|
||||||
|
testRegisterService(t, s, 4, "node1", "service1")
|
||||||
|
testRegisterService(t, s, 5, "node2", "service2")
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 5, 1)
|
||||||
|
|
||||||
|
// Register checks against the services.
|
||||||
|
testRegisterCheck(t, s, 6, "node1", "service1", "check3", api.HealthPassing)
|
||||||
|
testRegisterCheck(t, s, 7, "node2", "service2", "check4", api.HealthPassing)
|
||||||
|
// Index must be updated when checks are updated
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 6, 1)
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 7, 1)
|
||||||
|
|
||||||
|
if !watchFired(ws) {
|
||||||
|
t.Fatalf("bad")
|
||||||
|
}
|
||||||
|
// We ensure the idx for service2 has not been changed
|
||||||
|
testRegisterCheck(t, s, 8, "node2", "service2", "check4", api.HealthWarning)
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 8, 1)
|
||||||
|
testRegisterCheck(t, s, 9, "node2", "service2", "check4", api.HealthPassing)
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 9, 1)
|
||||||
|
|
||||||
|
// Add a new check on node1, while not on service, it should impact
|
||||||
|
// indexes of all services running on node1, aka service1
|
||||||
|
testRegisterCheck(t, s, 10, "node1", "", "check_node", api.HealthPassing)
|
||||||
|
|
||||||
|
// Service2 should not be modified
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 9, 1)
|
||||||
|
// Service1 should be modified
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 10, 1)
|
||||||
|
|
||||||
|
if !watchFired(ws) {
|
||||||
|
t.Fatalf("bad")
|
||||||
|
}
|
||||||
|
|
||||||
|
testRegisterService(t, s, 11, "node1", "service_shared")
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 11, 1)
|
||||||
|
testRegisterService(t, s, 12, "node2", "service_shared")
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 12, 2)
|
||||||
|
|
||||||
|
testRegisterCheck(t, s, 13, "node2", "service_shared", "check_service_shared", api.HealthCritical)
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 13, 2)
|
||||||
|
testRegisterCheck(t, s, 14, "node2", "service_shared", "check_service_shared", api.HealthPassing)
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 14, 2)
|
||||||
|
|
||||||
|
s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
|
||||||
|
s.DeleteService(16, "node2", "service_shared")
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
|
||||||
|
s.DeleteService(17, "node1", "service_shared")
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
|
||||||
|
|
||||||
|
testRegisterService(t, s, 18, "node1", "service_new")
|
||||||
|
// Since service does not exists anymore, its index should be last insert
|
||||||
|
// The behaviour is the same as all non-existing services, meaning
|
||||||
|
// we properly did collect garbage
|
||||||
|
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
|
||||||
|
if !watchFired(ws) {
|
||||||
|
t.Fatalf("bad")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateStore_CheckServiceNodes(t *testing.T) {
|
func TestStateStore_CheckServiceNodes(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
@ -2197,9 +2292,13 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
|
||||||
t.Fatalf("bad")
|
t.Fatalf("bad")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We ensure the idx for service2 has not been changed
|
||||||
|
ensureServiceVersion(t, s, ws, "service2", 7, 1)
|
||||||
|
|
||||||
// Query the state store for nodes and checks which have been registered
|
// Query the state store for nodes and checks which have been registered
|
||||||
// with a specific service.
|
// with a specific service.
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
|
ensureServiceVersion(t, s, ws, "service1", 6, 1)
|
||||||
idx, results, err := s.CheckServiceNodes(ws, "service1")
|
idx, results, err := s.CheckServiceNodes(ws, "service1")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
|
|
Loading…
Reference in New Issue