diff --git a/consul/state_store.go b/consul/state_store.go index d66c76d92e..b0496dcb65 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -705,6 +705,69 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e return nodes } +// KVSSet is used to create or update a KV entry +func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + // Get the existing node + res, err := s.kvsTable.GetTxn(tx, "id", d.Key) + if err != nil { + return err + } + + // Set the create and modify times + if len(res) == 0 { + d.CreateIndex = index + } else { + d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex + } + d.ModifyIndex = index + + if err := s.kvsTable.InsertTxn(tx, d); err != nil { + return err + } + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.kvsTable].Notify() + return tx.Commit() +} + +// KVSGet is used to get a KV entry +func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { + idx, res, err := s.kvsTable.Get("id", key) + var d *structs.DirEntry + if len(res) > 0 { + d = res[0].(*structs.DirEntry) + } + return idx, d, err +} + +// KVSList is used to list all KV entries with a prefix +func (s *StateStore) KVSList() (uint64, structs.DirEntries, error) { + return 0, nil, nil +} + +// KVSDelete is used to delete a KVS entry +func (s *StateStore) KVSDelete() error { + return nil +} + +// KVSDeleteTree is used to delete all keys with a given prefix +func (s *StateStore) KVSDeleteTree() error { + return nil +} + +// KVSCheckAndSet is used to perform an atomic check-and-set +func (s *StateStore) KVSCheckAndSet() error { + return nil +} + // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { // Begin a new txn on all tables diff --git a/consul/state_store_test.go b/consul/state_store_test.go index de5ccd8058..87fd0ac02c 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -933,3 +933,83 @@ func TestSS_Register_Deregister_Query(t *testing.T) { t.Fatalf("Bad: %v", nodes) } } + +func TestKVSSet_Get(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Should not exist + idx, d, err := store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 0 { + t.Fatalf("bad: %v", idx) + } + if d != nil { + t.Fatalf("bad: %v", d) + } + + // Create the entry + d = &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should exist exist + idx, d, err = store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if d.CreateIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.ModifyIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.Key != "/foo" { + t.Fatalf("bad: %v", d) + } + if d.Flags != 42 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "test" { + t.Fatalf("bad: %v", d) + } + + // Update the entry + d = &structs.DirEntry{Key: "/foo", Flags: 43, Value: []byte("zip")} + if err := store.KVSSet(1010, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should update + idx, d, err = store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1010 { + t.Fatalf("bad: %v", idx) + } + if d.CreateIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.ModifyIndex != 1010 { + t.Fatalf("bad: %v", d) + } + if d.Key != "/foo" { + t.Fatalf("bad: %v", d) + } + if d.Flags != 43 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } +}