From 73127ef407f3323483f7369826b251e22b72ecc2 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 19 Feb 2018 18:29:22 +0100
Subject: [PATCH 01/16] Services Indexes modified per service instead of using
a global Index
This patch improves the watches for services on large cluster:
each service has now its own index, such watches on a specific service
are not modified by changes in the global catalog.
It should improve a lot the performance of tools such as consul-template
or libraries performing watches on very large clusters with many
services/watches.
---
agent/consul/state/catalog.go | 88 ++++++++++++++++++++++++--
agent/consul/state/catalog_test.go | 99 ++++++++++++++++++++++++++++++
2 files changed, 183 insertions(+), 4 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 9f066d4178..16fb62cdff 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -517,7 +517,11 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
}
var sids []string
for service := services.Next(); service != nil; service = services.Next() {
- sids = append(sids, service.(*structs.ServiceNode).ServiceID)
+ svc := service.(*structs.ServiceNode)
+ sids = append(sids, svc.ServiceID)
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
}
// Do the delete in a separate loop so we don't trash the iterator.
@@ -638,6 +642,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
return nil
}
@@ -752,14 +759,29 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
return idx, results, nil
}
+// maxIndexForService return the maximum transaction number for a service
+// If the transaction is not set for the service, it will return the max
+// transaction number of "nodes", "services"
+func maxIndexForService(tx *memdb.Txn, serviceName string) (uint64, error) {
+ transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
+ if err == nil {
+ if idx, ok := transaction.(*IndexEntry); ok {
+ return idx.Value, nil
+ }
+ }
+ return maxIndexTxn(tx, "nodes", "services"), nil
+}
+
// ServiceNodes returns the nodes associated with a given service name.
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
- idx := maxIndexTxn(tx, "nodes", "services")
-
+ idx, err := maxIndexForService(tx, serviceName)
+ if err != nil {
+ panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
+ }
// List all the services.
services, err := tx.Get("services", "service", serviceName)
if err != nil {
@@ -787,7 +809,10 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
defer tx.Abort()
// Get the table index.
- idx := maxIndexTxn(tx, "nodes", "services")
+ idx, err := maxIndexForService(tx, service)
+ if err != nil {
+ panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err))
+ }
// List all the services.
services, err := tx.Get("services", "service", service)
@@ -979,6 +1004,10 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
return nil
}
+func serviceIndexName(name string) string {
+ return fmt.Sprintf("service.%s", name)
+}
+
// deleteServiceTxn is the inner method called to remove a service
// registration within an existing transaction.
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
@@ -1022,6 +1051,26 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
return fmt.Errorf("failed updating index: %s", err)
}
+ svc := service.(*structs.ServiceNode)
+ if remainingServicesItr, err := tx.Get("services", "service", svc.ServiceName); err == nil {
+ if remainingServicesItr != nil && remainingServicesItr.Next() != nil {
+ // We have at least one remaining service, update the index
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ } else {
+ // There are no more service instances, cleanup the service. index
+ serviceIndex, err := tx.First("index", "id", serviceIndexName(svc.ServiceName))
+ if err == nil && serviceIndex != nil {
+ // we found service. index, garbage collect it
+ if errW := tx.Delete("index", serviceIndex); errW != nil {
+ return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
+ }
+ }
+ }
+ } else {
+ return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
+ }
return nil
}
@@ -1087,6 +1136,21 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
+ if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ } else {
+ // Update the status for all the services associated with this node
+ services, err := tx.Get("services", "node", hc.Node)
+ if err != nil {
+ return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
+ }
+ for service := services.Next(); service != nil; service = services.Next() {
+ svc := service.(*structs.ServiceNode).ToNodeService()
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ }
}
// Delete any sessions for this check if the health is critical.
@@ -1328,6 +1392,22 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
if hc == nil {
return nil
}
+ existing := hc.(*structs.HealthCheck)
+ if existing != nil && existing.ServiceID != "" {
+ service, err := tx.First("services", "id", node, existing.ServiceID)
+ if err != nil {
+ return fmt.Errorf("failed service lookup: %s", err)
+ }
+ if service == nil {
+ return ErrMissingService
+ }
+
+ // Updated index of service
+ svc := service.(*structs.ServiceNode)
+ if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ }
// Delete the check from the DB and update the index.
if err := tx.Delete("checks", hc); err != nil {
diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go
index e948b092c4..f3d9f3e8d7 100644
--- a/agent/consul/state/catalog_test.go
+++ b/agent/consul/state/catalog_test.go
@@ -2166,6 +2166,101 @@ func TestStateStore_DeleteCheck(t *testing.T) {
}
}
+func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) {
+ idx, services, err := s.ServiceNodes(ws, serviceID)
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+ if idx != expectedIdx {
+ t.Fatalf("bad: %d, expected %d", idx, expectedIdx)
+ }
+ if len(services) != expectedSize {
+ t.Fatalf("expected size: %d, but was %d", expectedSize, len(services))
+ }
+}
+
+// TestIndexIndependance test that changes on a given service does not impact the
+// index of other services. It allows to have huge benefits for watches since
+// watchers are notified ONLY when there are changes in the given service
+func TestIndexIndependance(t *testing.T) {
+ s := testStateStore(t)
+
+ // Querying with no matches gives an empty response
+ ws := memdb.NewWatchSet()
+ idx, res, err := s.CheckServiceNodes(ws, "service1")
+ if idx != 0 || res != nil || err != nil {
+ t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
+ }
+
+ // Register some nodes.
+ testRegisterNode(t, s, 0, "node1")
+ testRegisterNode(t, s, 1, "node2")
+
+ // Register node-level checks. These should be the final result.
+ testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
+ testRegisterCheck(t, s, 3, "node2", "", "check2", api.HealthPassing)
+
+ // Register a service against the nodes.
+ testRegisterService(t, s, 4, "node1", "service1")
+ testRegisterService(t, s, 5, "node2", "service2")
+ ensureServiceVersion(t, s, ws, "service2", 5, 1)
+
+ // Register checks against the services.
+ testRegisterCheck(t, s, 6, "node1", "service1", "check3", api.HealthPassing)
+ testRegisterCheck(t, s, 7, "node2", "service2", "check4", api.HealthPassing)
+ // Index must be updated when checks are updated
+ ensureServiceVersion(t, s, ws, "service1", 6, 1)
+ ensureServiceVersion(t, s, ws, "service2", 7, 1)
+
+ if !watchFired(ws) {
+ t.Fatalf("bad")
+ }
+ // We ensure the idx for service2 has not been changed
+ testRegisterCheck(t, s, 8, "node2", "service2", "check4", api.HealthWarning)
+ ensureServiceVersion(t, s, ws, "service2", 8, 1)
+ testRegisterCheck(t, s, 9, "node2", "service2", "check4", api.HealthPassing)
+ ensureServiceVersion(t, s, ws, "service2", 9, 1)
+
+ // Add a new check on node1, while not on service, it should impact
+ // indexes of all services running on node1, aka service1
+ testRegisterCheck(t, s, 10, "node1", "", "check_node", api.HealthPassing)
+
+ // Service2 should not be modified
+ ensureServiceVersion(t, s, ws, "service2", 9, 1)
+ // Service1 should be modified
+ ensureServiceVersion(t, s, ws, "service1", 10, 1)
+
+ if !watchFired(ws) {
+ t.Fatalf("bad")
+ }
+
+ testRegisterService(t, s, 11, "node1", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 11, 1)
+ testRegisterService(t, s, 12, "node2", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 12, 2)
+
+ testRegisterCheck(t, s, 13, "node2", "service_shared", "check_service_shared", api.HealthCritical)
+ ensureServiceVersion(t, s, ws, "service_shared", 13, 2)
+ testRegisterCheck(t, s, 14, "node2", "service_shared", "check_service_shared", api.HealthPassing)
+ ensureServiceVersion(t, s, ws, "service_shared", 14, 2)
+
+ s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
+ ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
+ s.DeleteService(16, "node2", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
+ s.DeleteService(17, "node1", "service_shared")
+ ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
+
+ testRegisterService(t, s, 18, "node1", "service_new")
+ // Since service does not exists anymore, its index should be last insert
+ // The behaviour is the same as all non-existing services, meaning
+ // we properly did collect garbage
+ ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
+ if !watchFired(ws) {
+ t.Fatalf("bad")
+ }
+}
+
func TestStateStore_CheckServiceNodes(t *testing.T) {
s := testStateStore(t)
@@ -2197,9 +2292,13 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
t.Fatalf("bad")
}
+ // We ensure the idx for service2 has not been changed
+ ensureServiceVersion(t, s, ws, "service2", 7, 1)
+
// Query the state store for nodes and checks which have been registered
// with a specific service.
ws = memdb.NewWatchSet()
+ ensureServiceVersion(t, s, ws, "service1", 6, 1)
idx, results, err := s.CheckServiceNodes(ws, "service1")
if err != nil {
t.Fatalf("err: %s", err)
From bac8fb046f716e2351a4d2cb66577bdb309ac9eb Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 19 Feb 2018 19:30:25 +0100
Subject: [PATCH 02/16] Fixed comment about raftIndex + use test.Helper()
---
agent/consul/state/catalog.go | 6 +++---
agent/consul/state/catalog_test.go | 1 +
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 16fb62cdff..9c3cb3a29d 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -759,9 +759,9 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
return idx, results, nil
}
-// maxIndexForService return the maximum transaction number for a service
-// If the transaction is not set for the service, it will return the max
-// transaction number of "nodes", "services"
+// maxIndexForService return the maximum Raft Index for a service
+// If the index is not set for the service, it will return the max
+// Raft Index of "nodes", "services"
func maxIndexForService(tx *memdb.Txn, serviceName string) (uint64, error) {
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
if err == nil {
diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go
index f3d9f3e8d7..89745389ec 100644
--- a/agent/consul/state/catalog_test.go
+++ b/agent/consul/state/catalog_test.go
@@ -2168,6 +2168,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) {
idx, services, err := s.ServiceNodes(ws, serviceID)
+ t.Helper()
if err != nil {
t.Fatalf("err: %s", err)
}
From 4f10fae3c3ac17d60d8b2d1f9b6650753c68b78b Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 19 Feb 2018 22:44:49 +0100
Subject: [PATCH 03/16] Get only first service to test whether we have to
cleanup index of a service
---
agent/consul/state/catalog.go | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 9c3cb3a29d..cd0ff2f0a1 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -1052,8 +1052,8 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
}
svc := service.(*structs.ServiceNode)
- if remainingServicesItr, err := tx.Get("services", "service", svc.ServiceName); err == nil {
- if remainingServicesItr != nil && remainingServicesItr.Next() != nil {
+ if remainingService, err := tx.First("services", "service", svc.ServiceName); err == nil {
+ if remainingService != nil {
// We have at least one remaining service, update the index
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
From 99c9ea59050558db1e64d430764cfe80f1c27458 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 19 Feb 2018 23:04:17 +0100
Subject: [PATCH 04/16] Do not run tests in parallel since it breaks randomly
---
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.travis.yml b/.travis.yml
index 7d99c9bd3d..d56c6f3ff8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,6 @@ branches:
- master
script:
- - GOTEST_FLAGS="-p 2 -parallel 2" make test
+ - GOTEST_FLAGS="-p 1 -parallel 1" make test
sudo: false
From d15f9d9f8c5708d29cb12f5e3ea3003593919f5c Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 19 Feb 2018 23:34:53 +0100
Subject: [PATCH 05/16] [Revert] Do not run tests in parallel since it breaks
randomly
This is causing timeouts on Travis (more than 10min tests)
---
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.travis.yml b/.travis.yml
index d56c6f3ff8..7d99c9bd3d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,6 @@ branches:
- master
script:
- - GOTEST_FLAGS="-p 1 -parallel 1" make test
+ - GOTEST_FLAGS="-p 2 -parallel 2" make test
sudo: false
From a05d38737c8501c8a28257400b0effc0190a5435 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Tue, 20 Feb 2018 01:28:06 +0100
Subject: [PATCH 06/16] Enable Raft index optimization per service name on
health endpoint
Had to fix unit test in order to check properly indexes.
---
agent/consul/state/catalog.go | 25 ++++++++++++++++++-------
agent/consul/state/catalog_test.go | 6 ++++--
2 files changed, 22 insertions(+), 9 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index cd0ff2f0a1..07975f53c4 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -762,13 +762,16 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
// maxIndexForService return the maximum Raft Index for a service
// If the index is not set for the service, it will return the max
// Raft Index of "nodes", "services"
-func maxIndexForService(tx *memdb.Txn, serviceName string) (uint64, error) {
+func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) (uint64, error) {
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
if err == nil {
if idx, ok := transaction.(*IndexEntry); ok {
return idx.Value, nil
}
}
+ if checks {
+ return maxIndexTxn(tx, "nodes", "services", "checks"), nil
+ }
return maxIndexTxn(tx, "nodes", "services"), nil
}
@@ -778,7 +781,7 @@ func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, str
defer tx.Abort()
// Get the table index.
- idx, err := maxIndexForService(tx, serviceName)
+ idx, err := maxIndexForService(tx, serviceName, false)
if err != nil {
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
}
@@ -809,7 +812,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
defer tx.Abort()
// Get the table index.
- idx, err := maxIndexForService(tx, service)
+ idx, err := maxIndexForService(tx, service, false)
if err != nil {
panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err))
}
@@ -1263,8 +1266,10 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
defer tx.Abort()
// Get the table index.
- idx := maxIndexTxn(tx, "nodes", "checks")
-
+ idx, err := maxIndexForService(tx, serviceName, true)
+ if err != nil {
+ panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
+ }
// Return the checks.
iter, err := tx.Get("checks", "service", serviceName)
if err != nil {
@@ -1443,7 +1448,10 @@ func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
defer tx.Abort()
// Get the table index.
- idx := maxIndexTxn(tx, "nodes", "services", "checks")
+ idx, err := maxIndexForService(tx, serviceName, true)
+ if err != nil {
+ panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
+ }
// Query the state store for the service.
iter, err := tx.Get("services", "service", serviceName)
@@ -1467,7 +1475,10 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string)
defer tx.Abort()
// Get the table index.
- idx := maxIndexTxn(tx, "nodes", "services", "checks")
+ idx, err := maxIndexForService(tx, serviceName, true)
+ if err != nil {
+ panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
+ }
// Query the state store for the service.
iter, err := tx.Get("services", "service", serviceName)
diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go
index 89745389ec..bad625b496 100644
--- a/agent/consul/state/catalog_test.go
+++ b/agent/consul/state/catalog_test.go
@@ -2304,7 +2304,8 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
- if idx != 7 {
+ // registered with ensureServiceVersion(t, s, ws, "service1", 6, 1)
+ if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
@@ -2329,7 +2330,8 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
- if idx != 8 {
+ // service1 has been registered at idx=6, other different registrations do not count
+ if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
From 0861b8b1f9e2b926f83d9dc47b6f660ff2d7f19e Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Tue, 20 Feb 2018 02:07:01 +0100
Subject: [PATCH 07/16] Improve travis parameters to avoid flacky tests
---
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.travis.yml b/.travis.yml
index 7d99c9bd3d..951ac35852 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,6 @@ branches:
- master
script:
- - GOTEST_FLAGS="-p 2 -parallel 2" make test
+ - GOTEST_FLAGS="-p 4 -parallel 1" make test
sudo: false
From 5dd77e132fa218091841167b72153cfcea78bc11 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Tue, 20 Feb 2018 02:14:58 +0100
Subject: [PATCH 08/16] Improve travis parameters to avoid flacky tests, use -p
3
---
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.travis.yml b/.travis.yml
index 951ac35852..454c7233a7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,6 @@ branches:
- master
script:
- - GOTEST_FLAGS="-p 4 -parallel 1" make test
+ - GOTEST_FLAGS="-p 3 -parallel 1" make test
sudo: false
From 60454b570aaae2a7080e7d4657d359171f368193 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Tue, 20 Feb 2018 23:08:04 +0100
Subject: [PATCH 09/16] Only update services if tags are different
---
agent/consul/state/catalog.go | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 07975f53c4..8e3b20d98a 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -2,6 +2,7 @@ package state
import (
"fmt"
+ "reflect"
"strings"
"github.com/hashicorp/consul/agent/structs"
@@ -618,9 +619,12 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
// conversion doesn't populate any of the node-specific information.
// That's always populated when we read from the state store.
entry := svc.ToServiceNode(node)
+ hasSameTags := false
if existing != nil {
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
entry.ModifyIndex = idx
+ eSvc := existing.(*structs.ServiceNode)
+ hasSameTags = reflect.DeepEqual(eSvc.ServiceTags, svc.Tags)
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
@@ -639,8 +643,11 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("services", entry); err != nil {
return fmt.Errorf("failed inserting service: %s", err)
}
- if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
- return fmt.Errorf("failed updating index: %s", err)
+ if !hasSameTags {
+ // We need to update /catalog/services only tags are different
+ if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
}
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
From 09351ba9a622a63a678d8b67a48705dd931a4933 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Tue, 20 Feb 2018 23:34:38 +0100
Subject: [PATCH 10/16] [Revert] Only update services if tags are different
This patch did give some better results, but break watches on
the services of a node.
It is possible to apply the same optimization for nodes than
to services (one index per instance), but it would complicate
further the patch.
Let's do it in another PR.
---
agent/consul/state/catalog.go | 11 ++---------
1 file changed, 2 insertions(+), 9 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 8e3b20d98a..07975f53c4 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -2,7 +2,6 @@ package state
import (
"fmt"
- "reflect"
"strings"
"github.com/hashicorp/consul/agent/structs"
@@ -619,12 +618,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
// conversion doesn't populate any of the node-specific information.
// That's always populated when we read from the state store.
entry := svc.ToServiceNode(node)
- hasSameTags := false
if existing != nil {
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
entry.ModifyIndex = idx
- eSvc := existing.(*structs.ServiceNode)
- hasSameTags = reflect.DeepEqual(eSvc.ServiceTags, svc.Tags)
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
@@ -643,11 +639,8 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if err := tx.Insert("services", entry); err != nil {
return fmt.Errorf("failed inserting service: %s", err)
}
- if !hasSameTags {
- // We need to update /catalog/services only tags are different
- if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
- return fmt.Errorf("failed updating index: %s", err)
- }
+ if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
}
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
From a8d374510466e44e519adf48e3340c28d38ac9e5 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Tue, 20 Feb 2018 23:57:28 +0100
Subject: [PATCH 11/16] Fixed comments for function maxIndexForService
---
agent/consul/state/catalog.go | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 07975f53c4..454b340821 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -760,8 +760,9 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
}
// maxIndexForService return the maximum Raft Index for a service
-// If the index is not set for the service, it will return the max
-// Raft Index of "nodes", "services"
+// If the index is not set for the service, it will return:
+// - maxIndex(nodes, services) if checks if false
+// - maxIndex(nodes, services, checks) if checks if false
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) (uint64, error) {
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
if err == nil {
From 048db1d033d3c8ed65e2569010caca46abe01a01 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Thu, 22 Feb 2018 08:38:42 +0100
Subject: [PATCH 12/16] [Revert] travis tunning as requested by @banks
---
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.travis.yml b/.travis.yml
index 454c7233a7..7d99c9bd3d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,6 @@ branches:
- master
script:
- - GOTEST_FLAGS="-p 3 -parallel 1" make test
+ - GOTEST_FLAGS="-p 2 -parallel 2" make test
sudo: false
From e7d1668347c751104ccebcd384f9ab742d9e0f55 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Thu, 22 Feb 2018 09:27:55 +0100
Subject: [PATCH 13/16] Change .travis.yml, set parallel to 1 to pass tests
---
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.travis.yml b/.travis.yml
index 7d99c9bd3d..8527e3ee3f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,6 @@ branches:
- master
script:
- - GOTEST_FLAGS="-p 2 -parallel 2" make test
+ - GOTEST_FLAGS="-p 2 -parallel 1" make test
sudo: false
From 7ff20fc4be637634045c3c4c2137ac42d7b9d99a Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Thu, 22 Feb 2018 14:16:24 +0100
Subject: [PATCH 14/16] Revert "Change .travis.yml, set parallel to 1 to pass
tests"
This reverts commit e7d1668347c751104ccebcd384f9ab742d9e0f55.
---
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.travis.yml b/.travis.yml
index 8527e3ee3f..7d99c9bd3d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,6 @@ branches:
- master
script:
- - GOTEST_FLAGS="-p 2 -parallel 1" make test
+ - GOTEST_FLAGS="-p 2 -parallel 2" make test
sudo: false
From 360dc1dd8d406bf989769393f3e31aefceab9cc3 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Thu, 1 Mar 2018 14:09:36 +0100
Subject: [PATCH 15/16] Simplified error handling for maxIndexForService
* added unit tests to ensure service index is properly garbage collected
* added Upgrade from Version 1.0.6 to higher section in documentation
---
agent/consul/state/catalog.go | 37 ++++++++-------------------
agent/consul/state/catalog_test.go | 24 +++++++++++++++++
website/source/docs/upgrading.html.md | 12 +++++++++
3 files changed, 47 insertions(+), 26 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 454b340821..38d59e8109 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -761,19 +761,19 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
// maxIndexForService return the maximum Raft Index for a service
// If the index is not set for the service, it will return:
-// - maxIndex(nodes, services) if checks if false
-// - maxIndex(nodes, services, checks) if checks if false
-func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) (uint64, error) {
+// - maxIndex(nodes, services) if checks is false
+// - maxIndex(nodes, services, checks) if checks is true
+func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
if err == nil {
if idx, ok := transaction.(*IndexEntry); ok {
- return idx.Value, nil
+ return idx.Value
}
}
if checks {
- return maxIndexTxn(tx, "nodes", "services", "checks"), nil
+ return maxIndexTxn(tx, "nodes", "services", "checks")
}
- return maxIndexTxn(tx, "nodes", "services"), nil
+ return maxIndexTxn(tx, "nodes", "services")
}
// ServiceNodes returns the nodes associated with a given service name.
@@ -782,10 +782,7 @@ func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, str
defer tx.Abort()
// Get the table index.
- idx, err := maxIndexForService(tx, serviceName, false)
- if err != nil {
- panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
- }
+ idx := maxIndexForService(tx, serviceName, false)
// List all the services.
services, err := tx.Get("services", "service", serviceName)
if err != nil {
@@ -813,10 +810,7 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
defer tx.Abort()
// Get the table index.
- idx, err := maxIndexForService(tx, service, false)
- if err != nil {
- panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err))
- }
+ idx := maxIndexForService(tx, service, false)
// List all the services.
services, err := tx.Get("services", "service", service)
@@ -1267,10 +1261,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
defer tx.Abort()
// Get the table index.
- idx, err := maxIndexForService(tx, serviceName, true)
- if err != nil {
- panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
- }
+ idx := maxIndexForService(tx, serviceName, true)
// Return the checks.
iter, err := tx.Get("checks", "service", serviceName)
if err != nil {
@@ -1449,10 +1440,7 @@ func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
defer tx.Abort()
// Get the table index.
- idx, err := maxIndexForService(tx, serviceName, true)
- if err != nil {
- panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
- }
+ idx := maxIndexForService(tx, serviceName, true)
// Query the state store for the service.
iter, err := tx.Get("services", "service", serviceName)
@@ -1476,10 +1464,7 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string)
defer tx.Abort()
// Get the table index.
- idx, err := maxIndexForService(tx, serviceName, true)
- if err != nil {
- panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err))
- }
+ idx := maxIndexForService(tx, serviceName, true)
// Query the state store for the service.
iter, err := tx.Get("services", "service", serviceName)
diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go
index bad625b496..b22b82fed6 100644
--- a/agent/consul/state/catalog_test.go
+++ b/agent/consul/state/catalog_test.go
@@ -2180,6 +2180,25 @@ func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID s
}
}
+// Ensure index exist, if expectedIndex = -1, ensure the index does not exists
+func ensureIndexForService(t *testing.T, s *Store, ws memdb.WatchSet, serviceName string, expectedIndex uint64) {
+ t.Helper()
+ tx := s.db.Txn(false)
+ defer tx.Abort()
+ transaction, err := tx.First("index", "id", fmt.Sprintf("service.%s", serviceName))
+ if err == nil {
+ if idx, ok := transaction.(*IndexEntry); ok {
+ if expectedIndex != idx.Value {
+ t.Fatalf("Expected index %d, but had %d for %s", expectedIndex, idx.Value, serviceName)
+ }
+ return
+ }
+ }
+ if expectedIndex != 0 {
+ t.Fatalf("Index for %s was expected but not found", serviceName)
+ }
+}
+
// TestIndexIndependance test that changes on a given service does not impact the
// index of other services. It allows to have huge benefits for watches since
// watchers are notified ONLY when there are changes in the given service
@@ -2247,8 +2266,10 @@ func TestIndexIndependance(t *testing.T) {
s.DeleteCheck(15, "node2", types.CheckID("check_service_shared"))
ensureServiceVersion(t, s, ws, "service_shared", 15, 2)
+ ensureIndexForService(t, s, ws, "service_shared", 15)
s.DeleteService(16, "node2", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 16, 1)
+ ensureIndexForService(t, s, ws, "service_shared", 16)
s.DeleteService(17, "node1", "service_shared")
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)
@@ -2257,9 +2278,12 @@ func TestIndexIndependance(t *testing.T) {
// The behaviour is the same as all non-existing services, meaning
// we properly did collect garbage
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)
+ // No index should exist anymore, it must have been garbage collected
+ ensureIndexForService(t, s, ws, "service_shared", 0)
if !watchFired(ws) {
t.Fatalf("bad")
}
+
}
func TestStateStore_CheckServiceNodes(t *testing.T) {
diff --git a/website/source/docs/upgrading.html.md b/website/source/docs/upgrading.html.md
index 12ee090670..704901dc74 100644
--- a/website/source/docs/upgrading.html.md
+++ b/website/source/docs/upgrading.html.md
@@ -36,6 +36,18 @@ Consul is A, and version B is released.
by running `consul members` to make sure all members have the latest
build and highest protocol version.
+## Upgrade from Version 1.0.6 to higher
+
+In version 1.0.7 and higher, when requesting a specific service (/health or
+/catalog endpoints), the X-Consul-Index returned is now the index at which the
+service has been modified, not the global Raft Index of all services.
+
+Thus, if several versions of Consul are pre 1.0.7 and post 1.0.7, (ie: during an
+upgrade) it is possible to have a lower X-Consul-Index returned than the previous
+X-Consul-Index issued for this service.
+
+It should not be an issue unless library code does issue an error if it expects
+X-Consul-Index to be strictly increasing.
## Backward Incompatible Upgrades
From 7b81e2c3ad133ef5c460dd642ab44dd07bf08c8a Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Fri, 2 Mar 2018 09:08:00 +0100
Subject: [PATCH 16/16] Better information and advices for upgrade to 1.0.7+
---
website/source/docs/upgrading.html.md | 24 ++++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git a/website/source/docs/upgrading.html.md b/website/source/docs/upgrading.html.md
index 704901dc74..52fc572525 100644
--- a/website/source/docs/upgrading.html.md
+++ b/website/source/docs/upgrading.html.md
@@ -38,16 +38,24 @@ Consul is A, and version B is released.
## Upgrade from Version 1.0.6 to higher
-In version 1.0.7 and higher, when requesting a specific service (/health or
-/catalog endpoints), the X-Consul-Index returned is now the index at which the
-service has been modified, not the global Raft Index of all services.
+In version 1.0.7 and higher, when requesting a specific service
+(`/v1/health/:service` or `/v1/catalog/:service` endpoints), the
+`X-Consul-Index` returned is now the index at which that specific service was
+last modified.
+In version 1.0.6 and earlier the X-Consul-Index returned was the index at
+which any service was last modified. See
+[GH-3890](https://github.com/hashicorp/consul/issues/3890) for more details.
-Thus, if several versions of Consul are pre 1.0.7 and post 1.0.7, (ie: during an
-upgrade) it is possible to have a lower X-Consul-Index returned than the previous
-X-Consul-Index issued for this service.
+During upgrades from 1.0.6 or lower to 1.0.7 or higher, watchers are likely to
+see `X-Consul-Index` for these endpoints decrease between blocking calls.
-It should not be an issue unless library code does issue an error if it expects
-X-Consul-Index to be strictly increasing.
+Consul’s watch feature and consul-template should gracefully handle this case.
+Other tools relying on blocking service or health queries are also likely to
+work; some may require a restart. It is possible external tools could break and
+either stop working or continually re-request data without blocking if they
+have assumed indexes can never decrease or be reset and/or persist index
+values. Please test any blocking query integrations in a controlled environment
+before proceeding.
## Backward Incompatible Upgrades