kv: do not trigger watches when setting the same value (#5885)

If a KVSet is performed but does not update the entry, do not trigger
watches for this key.
This avoids releasing blocking queries for KV values that did not
actually changed.
This commit is contained in:
Aestek 2019-06-18 15:06:29 +02:00 committed by Hans Hasselberg
parent f3d9b999ee
commit b839f52195
4 changed files with 37 additions and 4 deletions

View File

@ -131,14 +131,15 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
// whatever the existing session is. // whatever the existing session is.
func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair // Retrieve an existing KV pair
existing, err := tx.First("kvs", "id", entry.Key) existingNode, err := tx.First("kvs", "id", entry.Key)
if err != nil { if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err) return fmt.Errorf("failed kvs lookup: %s", err)
} }
existing, _ := existingNode.(*structs.DirEntry)
// Set the indexes. // Set the indexes.
if existing != nil { if existing != nil {
entry.CreateIndex = existing.(*structs.DirEntry).CreateIndex entry.CreateIndex = existing.CreateIndex
} else { } else {
entry.CreateIndex = idx entry.CreateIndex = idx
} }
@ -148,12 +149,17 @@ func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, up
// session for a new entry is "no session". // session for a new entry is "no session".
if !updateSession { if !updateSession {
if existing != nil { if existing != nil {
entry.Session = existing.(*structs.DirEntry).Session entry.Session = existing.Session
} else { } else {
entry.Session = "" entry.Session = ""
} }
} }
// skip write if the entry did not change
if existing != nil && existing.Equal(entry) {
return nil
}
// Store the kv pair in the state store and update the index. // Store the kv pair in the state store and update the index.
if err := tx.Insert("kvs", entry); err != nil { if err := tx.Insert("kvs", entry); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err) return fmt.Errorf("failed inserting kvs entry: %s", err)

View File

@ -6,6 +6,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
@ -357,6 +359,21 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
if result != nil || err != nil || idx != 8 { if result != nil || err != nil || idx != 8 {
t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err) t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
} }
// setting the same value again does not update the index
ws = memdb.NewWatchSet()
// Write a new K/V entry to the store.
entry = &structs.DirEntry{
Key: "foo",
Value: []byte("bar"),
}
require.Nil(t, s.KVSSet(1, entry))
require.Nil(t, s.KVSSet(2, entry))
idx, _, err = s.KVSGet(ws, entry.Key)
require.Nil(t, err)
require.Equal(t, uint64(1), idx)
} }
func TestStateStore_KVSList(t *testing.T) { func TestStateStore_KVSList(t *testing.T) {
@ -1430,7 +1447,7 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
} }
// Verify the snapshot. // Verify the snapshot.
if idx := snap.LastIndex(); idx != 7 { if idx := snap.LastIndex(); idx != 6 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
iter, err := snap.KVs() iter, err := snap.KVs()

View File

@ -1358,6 +1358,14 @@ func (d *DirEntry) Clone() *DirEntry {
} }
} }
func (d *DirEntry) Equal(o *DirEntry) bool {
return d.LockIndex == o.LockIndex &&
d.Key == o.Key &&
d.Flags == o.Flags &&
bytes.Equal(d.Value, o.Value) &&
d.Session == o.Session
}
type DirEntries []*DirEntry type DirEntries []*DirEntry
// KVSRequest is used to operate on the Key-Value store // KVSRequest is used to operate on the Key-Value store

View File

@ -480,6 +480,7 @@ func TestAPI_LockMonitorRetry(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
pair.Value = []byte{1}
if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -496,6 +497,7 @@ func TestAPI_LockMonitorRetry(t *testing.T) {
mutex.Lock() mutex.Lock()
errors = 10 errors = 10
mutex.Unlock() mutex.Unlock()
pair.Value = []byte{2}
if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }