From b839f521958df4c7d48a738a0ae8de1ba6122ac8 Mon Sep 17 00:00:00 2001 From: Aestek Date: Tue, 18 Jun 2019 15:06:29 +0200 Subject: [PATCH] kv: do not trigger watches when setting the same value (#5885) If a KVSet is performed but does not update the entry, do not trigger watches for this key. This avoids releasing blocking queries for KV values that did not actually changed. --- agent/consul/state/kvs.go | 12 +++++++++--- agent/consul/state/kvs_test.go | 19 ++++++++++++++++++- agent/structs/structs.go | 8 ++++++++ api/lock_test.go | 2 ++ 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/agent/consul/state/kvs.go b/agent/consul/state/kvs.go index 070a2fc815..eea081c91d 100644 --- a/agent/consul/state/kvs.go +++ b/agent/consul/state/kvs.go @@ -131,14 +131,15 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error { // whatever the existing session is. func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { // Retrieve an existing KV pair - existing, err := tx.First("kvs", "id", entry.Key) + existingNode, err := tx.First("kvs", "id", entry.Key) if err != nil { return fmt.Errorf("failed kvs lookup: %s", err) } + existing, _ := existingNode.(*structs.DirEntry) // Set the indexes. if existing != nil { - entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex + entry.CreateIndex = existing.CreateIndex } else { entry.CreateIndex = idx } @@ -148,12 +149,17 @@ func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, up // session for a new entry is "no session". if !updateSession { if existing != nil { - entry.Session = existing.(*structs.DirEntry).Session + entry.Session = existing.Session } else { entry.Session = "" } } + // skip write if the entry did not change + if existing != nil && existing.Equal(entry) { + return nil + } + // Store the kv pair in the state store and update the index. if err := tx.Insert("kvs", entry); err != nil { return fmt.Errorf("failed inserting kvs entry: %s", err) diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index 1ff2d6394f..e9f088c6df 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" ) @@ -357,6 +359,21 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) { if result != nil || err != nil || idx != 8 { t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) } + + // setting the same value again does not update the index + ws = memdb.NewWatchSet() + // Write a new K/V entry to the store. + entry = &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + } + require.Nil(t, s.KVSSet(1, entry)) + require.Nil(t, s.KVSSet(2, entry)) + + idx, _, err = s.KVSGet(ws, entry.Key) + require.Nil(t, err) + + require.Equal(t, uint64(1), idx) } func TestStateStore_KVSList(t *testing.T) { @@ -1430,7 +1447,7 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) { } // Verify the snapshot. - if idx := snap.LastIndex(); idx != 7 { + if idx := snap.LastIndex(); idx != 6 { t.Fatalf("bad index: %d", idx) } iter, err := snap.KVs() diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 62cdfe5ae6..4df742d801 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1358,6 +1358,14 @@ func (d *DirEntry) Clone() *DirEntry { } } +func (d *DirEntry) Equal(o *DirEntry) bool { + return d.LockIndex == o.LockIndex && + d.Key == o.Key && + d.Flags == o.Flags && + bytes.Equal(d.Value, o.Value) && + d.Session == o.Session +} + type DirEntries []*DirEntry // KVSRequest is used to operate on the Key-Value store diff --git a/api/lock_test.go b/api/lock_test.go index a0d9776725..c97676e930 100644 --- a/api/lock_test.go +++ b/api/lock_test.go @@ -480,6 +480,7 @@ func TestAPI_LockMonitorRetry(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + pair.Value = []byte{1} if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { t.Fatalf("err: %v", err) } @@ -496,6 +497,7 @@ func TestAPI_LockMonitorRetry(t *testing.T) { mutex.Lock() errors = 10 mutex.Unlock() + pair.Value = []byte{2} if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { t.Fatalf("err: %v", err) }