diff --git a/command/agent/http.go b/command/agent/http.go index 33e84601ae..40e97da204 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -88,6 +88,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService)) + s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint)) + if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) diff --git a/command/agent/http_api.md b/command/agent/http_api.md index 1ac2f5ae33..ff379faac6 100644 --- a/command/agent/http_api.md +++ b/command/agent/http_api.md @@ -8,6 +8,7 @@ register new services. The URLs are also versioned to allow for changes in the API. The current URLs supported are: +Catalog: * /v1/catalog/register : Registers a new service * /v1/catalog/deregister : Deregisters a service or node * /v1/catalog/datacenters : Lists known datacenters @@ -16,15 +17,17 @@ The current URLs supported are: * /v1/catalog/service// : Lists the nodes in a given service * /v1/catalog/node// : Lists the services provided by a node -* Health system: +Health system: * /v1/health/node/: Returns the health info of a node * /v1/health/checks/: Returns the checks of a service * /v1/health/service/: Returns the nodes and health info of a service * /v1/health/state/: Returns the checks in a given state +Status: * /v1/status/leader : Returns the current Raft leader * /v1/status/peers : Returns the current Raft peer set +Agent: * /v1/agent/checks: Returns the checks the local agent is managing * /v1/agent/services : Returns the services local agent is managing * /v1/agent/members : Returns the members as seen by the local serf agent @@ -37,3 +40,7 @@ The current URLs supported are: * /v1/agent/check/fail/ * /v1/agent/service/register * /v1/agent/service/deregister/ + +KVS: +* /v1/kv/ + diff --git a/command/agent/kvs_endpoint.go b/command/agent/kvs_endpoint.go new file mode 100644 index 0000000000..af97aa5c16 --- /dev/null +++ b/command/agent/kvs_endpoint.go @@ -0,0 +1,153 @@ +package agent + +import ( + "bytes" + "github.com/hashicorp/consul/consul/structs" + "io" + "net/http" + "strconv" + "strings" +) + +func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Set default DC + args := structs.KeyRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done { + return nil, nil + } + + // Pull out the key name, validation left to each sub-handler + args.Key = strings.TrimPrefix(req.URL.Path, "/v1/kv/") + + // Switch on the method + switch req.Method { + case "GET": + return s.KVSGet(resp, req, &args) + case "PUT": + return s.KVSPut(resp, req, &args) + case "DELETE": + return s.KVSDelete(resp, req, &args) + default: + resp.WriteHeader(405) + return nil, nil + } + return nil, nil +} + +// KVSGet handles a GET request +func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) { + // Check for recurse + method := "KVS.Get" + params := req.URL.Query() + if _, ok := params["recurse"]; ok { + method = "KVS.List" + } else if missingKey(resp, args) { + return nil, nil + } + + // Make the RPC + var out structs.IndexedDirEntries + if err := s.agent.RPC(method, &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + + // Check if we get a not found + if len(out.Entries) == 0 { + resp.WriteHeader(404) + return nil, nil + } + return out.Entries, nil +} + +// KVSPut handles a PUT request +func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) { + if missingKey(resp, args) { + return nil, nil + } + applyReq := structs.KVSRequest{ + Datacenter: args.Datacenter, + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: args.Key, + Flags: 0, + Value: nil, + }, + } + + // Check for flags + params := req.URL.Query() + if _, ok := params["flags"]; ok { + flagVal, err := strconv.ParseUint(params.Get("flags"), 10, 64) + if err != nil { + return nil, err + } + applyReq.DirEnt.Flags = flagVal + } + + // Check for cas value + if _, ok := params["cas"]; ok { + casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64) + if err != nil { + return nil, err + } + applyReq.DirEnt.ModifyIndex = casVal + applyReq.Op = structs.KVSCAS + } + + // Copy the value + buf := bytes.NewBuffer(nil) + if _, err := io.Copy(buf, req.Body); err != nil { + return nil, err + } + applyReq.DirEnt.Value = buf.Bytes() + + // Make the RPC + var out bool + if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil { + return nil, err + } + + // Only use the out value if this was a CAS + if applyReq.Op == structs.KVSSet { + return true, nil + } else { + return out, nil + } +} + +// KVSPut handles a DELETE request +func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) { + applyReq := structs.KVSRequest{ + Datacenter: args.Datacenter, + Op: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: args.Key, + }, + } + + // Check for recurse + params := req.URL.Query() + if _, ok := params["recurse"]; ok { + applyReq.Op = structs.KVSDeleteTree + } else if missingKey(resp, args) { + return nil, nil + } + + // Make the RPC + var out bool + if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil { + return nil, err + } + return nil, nil +} + +// missingKey checks if the key is missing +func missingKey(resp http.ResponseWriter, args *structs.KeyRequest) bool { + if args.Key == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing key name")) + return true + } + return false +} diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go new file mode 100644 index 0000000000..6bb476e0a8 --- /dev/null +++ b/command/agent/kvs_endpoint_test.go @@ -0,0 +1,291 @@ +package agent + +import ( + "bytes" + "fmt" + "github.com/hashicorp/consul/consul/structs" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + keys := []string{ + "baz", + "bar", + "foo/sub1", + "foo/sub2", + "zip", + } + + for _, key := range keys { + buf := bytes.NewBuffer([]byte("test")) + req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + for _, key := range keys { + req, err := http.NewRequest("GET", "/v1/kv/"+key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + header := resp.Header().Get("X-Consul-Index") + if header == "" { + t.Fatalf("Bad: %v", header) + } + + res, ok := obj.(structs.DirEntries) + if !ok { + t.Fatalf("should work") + } + + if len(res) != 1 { + t.Fatalf("bad: %v", res) + } + + if res[0].Key != key { + t.Fatalf("bad: %v", res) + } + } + + for _, key := range keys { + req, err := http.NewRequest("DELETE", "/v1/kv/"+key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + } +} + +func TestKVSEndpoint_Recurse(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + keys := []string{ + "bar", + "baz", + "foo/sub1", + "foo/sub2", + "zip", + } + + for _, key := range keys { + buf := bytes.NewBuffer([]byte("test")) + req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + { + // Get all the keys + req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + header := resp.Header().Get("X-Consul-Index") + if header == "" { + t.Fatalf("Bad: %v", header) + } + + res, ok := obj.(structs.DirEntries) + if !ok { + t.Fatalf("should work") + } + + if len(res) != len(keys) { + t.Fatalf("bad: %v", res) + } + + for idx, key := range keys { + if res[idx].Key != key { + t.Fatalf("bad: %v %v", res[idx].Key, key) + } + } + } + + { + req, err := http.NewRequest("DELETE", "/v1/kv/?recurse", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + _, err = srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + } + + { + // Get all the keys + req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if obj != nil { + t.Fatalf("bad: %v", obj) + } + } +} + +func TestKVSEndpoint_CAS(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + { + buf := bytes.NewBuffer([]byte("test")) + req, err := http.NewRequest("PUT", "/v1/kv/test?flags=50", buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + req, err := http.NewRequest("GET", "/v1/kv/test", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + d := obj.(structs.DirEntries)[0] + + // Check the flags + if d.Flags != 50 { + t.Fatalf("bad: %v", d) + } + + // Create a CAS request, bad index + { + buf := bytes.NewBuffer([]byte("zip")) + req, err := http.NewRequest("PUT", + fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex-1), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); res { + t.Fatalf("should NOT work") + } + } + + // Create a CAS request, good index + { + buf := bytes.NewBuffer([]byte("zip")) + req, err := http.NewRequest("PUT", + fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex), buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.KVSEndpoint(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + if res := obj.(bool); !res { + t.Fatalf("should work") + } + } + + // Verify the update + req, _ = http.NewRequest("GET", "/v1/kv/test", nil) + resp = httptest.NewRecorder() + obj, _ = srv.KVSEndpoint(resp, req) + d = obj.(structs.DirEntries)[0] + + if d.Flags != 42 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } +} diff --git a/consul/fsm.go b/consul/fsm.go index 4646f7e4f1..22854729fd 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -65,6 +65,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.decodeRegister(buf[1:], log.Index) case structs.DeregisterRequestType: return c.applyDeregister(buf[1:], log.Index) + case structs.KVSRequestType: + return c.applyKVSOperation(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } @@ -131,6 +133,32 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { return nil } +func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { + var req structs.KVSRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + switch req.Op { + case structs.KVSSet: + return c.state.KVSSet(index, &req.DirEnt) + case structs.KVSDelete: + return c.state.KVSDelete(index, req.DirEnt.Key) + case structs.KVSDeleteTree: + return c.state.KVSDeleteTree(index, req.DirEnt.Key) + case structs.KVSCAS: + act, err := c.state.KVSCheckAndSet(index, &req.DirEnt) + if err != nil { + return err + } else { + return act + } + default: + c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op) + return fmt.Errorf("Invalid KVS operation '%s'", req.Op) + } + return nil +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) @@ -152,6 +180,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { if err != nil { return err } + c.state.Close() c.state = state // Create a decoder @@ -184,6 +213,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } c.applyRegister(&req, header.LastIndex) + case structs.KVSRequestType: + var req structs.DirEntry + if err := dec.Decode(&req); err != nil { + return err + } + if err := c.state.KVSRestore(&req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -247,6 +285,38 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { } } } + + // Enable GC of the ndoes + nodes = nil + + // Dump the KVS entries + streamCh := make(chan interface{}, 256) + errorCh := make(chan error) + go func() { + if err := s.state.KVSDump(streamCh); err != nil { + errorCh <- err + } + }() + +OUTER: + for { + select { + case raw := <-streamCh: + if raw == nil { + break OUTER + } + sink.Write([]byte{byte(structs.KVSRequestType)}) + if err := encoder.Encode(raw); err != nil { + sink.Cancel() + return err + } + + case err := <-errorCh: + sink.Cancel() + return err + } + } + return nil } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 5c210db06e..24f7c7cbbf 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -322,6 +322,10 @@ func TestFSM_SnapshotRestore(t *testing.T) { Status: structs.HealthPassing, ServiceID: "web", }) + fsm.state.KVSSet(8, &structs.DirEntry{ + Key: "/test", + Value: []byte("foo"), + }) // Snapshot snap, err := fsm.Snapshot() @@ -370,4 +374,198 @@ func TestFSM_SnapshotRestore(t *testing.T) { if len(checks) != 1 { t.Fatalf("Bad: %v", checks) } + + // Verify key is set + _, d, err := fsm.state.KVSGet("/test") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(d.Value) != "foo" { + t.Fatalf("bad: %v", d) + } +} + +func TestFSM_KVSSet(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("missing") + } +} + +func TestFSM_KVSDelete(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Run the delete + req.Op = structs.KVSDelete + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is not set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("key present") + } +} + +func TestFSM_KVSDeleteTree(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Run the delete tree + req.Op = structs.KVSDeleteTree + req.DirEnt.Key = "/test" + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is not set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d != nil { + t.Fatalf("key present") + } +} + +func TestFSM_KVSCheckAndSet(t *testing.T) { + fsm, err := NewFSM(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + req := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "/test/path", + Flags: 0, + Value: []byte("test"), + }, + } + buf, err := structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify key is set + _, d, err := fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("key missing") + } + + // Run the check-and-set + req.Op = structs.KVSCAS + req.DirEnt.ModifyIndex = d.ModifyIndex + req.DirEnt.Value = []byte("zip") + buf, err = structs.Encode(structs.KVSRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = fsm.Apply(makeLog(buf)) + if resp.(bool) != true { + t.Fatalf("resp: %v", resp) + } + + // Verify key is updated + _, d, err = fsm.state.KVSGet("/test/path") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } } diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go new file mode 100644 index 0000000000..0e884524f3 --- /dev/null +++ b/consul/kvs_endpoint.go @@ -0,0 +1,115 @@ +package consul + +import ( + "fmt" + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/structs" + "time" +) + +// KVS endpoint is used to manipulate the Key-Value store +type KVS struct { + srv *Server +} + +// Apply is used to apply a KVS request to the data store. This should +// only be used for operations that modify the data +func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { + if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now()) + + // Verify the args + if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree { + return fmt.Errorf("Must provide key") + } + + // Apply the update + resp, err := k.srv.raftApply(structs.KVSRequestType, args) + if err != nil { + k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + // Check if the return type is a bool + if respBool, ok := resp.(bool); ok { + *reply = respBool + } + return nil +} + +// Get is used to lookup a single key +func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { + if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done { + return err + } + + // Get the local state + state := k.srv.fsm.State() + return k.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("KVSGet"), + func() (uint64, error) { + index, ent, err := state.KVSGet(args.Key) + if err != nil { + return 0, err + } + if ent == nil { + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if index == 0 { + reply.Index = 1 + } else { + reply.Index = index + } + reply.Entries = nil + } else { + reply.Index = ent.ModifyIndex + reply.Entries = structs.DirEntries{ent} + } + return reply.Index, nil + }) +} + +// List is used to list all keys with a given prefix +func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { + if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done { + return err + } + + // Get the local state + state := k.srv.fsm.State() + return k.srv.blockingRPC(&args.BlockingQuery, + state.QueryTables("KVSList"), + func() (uint64, error) { + index, ent, err := state.KVSList(args.Key) + if err != nil { + return 0, err + } + if len(ent) == 0 { + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if index == 0 { + reply.Index = 1 + } else { + reply.Index = index + } + reply.Entries = nil + } else { + // Determine the maximum affected index + var maxIndex uint64 + for _, e := range ent { + if e.ModifyIndex > maxIndex { + maxIndex = e.ModifyIndex + } + } + + reply.Index = maxIndex + reply.Entries = ent + } + return reply.Index, nil + }) +} diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go new file mode 100644 index 0000000000..d116e82c59 --- /dev/null +++ b/consul/kvs_endpoint_test.go @@ -0,0 +1,173 @@ +package consul + +import ( + "github.com/hashicorp/consul/consul/structs" + "os" + "testing" + "time" +) + +func TestKVS_Apply(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify + state := s1.fsm.State() + _, d, err := state.KVSGet("test") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("should not be nil") + } + + // Do a check and set + arg.Op = structs.KVSCAS + arg.DirEnt.ModifyIndex = d.ModifyIndex + arg.DirEnt.Flags = 43 + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Check this was applied + if out != true { + t.Fatalf("bad: %v", out) + } + + // Verify + _, d, err = state.KVSGet("test") + if err != nil { + t.Fatalf("err: %v", err) + } + if d.Flags != 43 { + t.Fatalf("bad: %v", d) + } +} + +func TestKVS_Get(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + getR := structs.KeyRequest{ + Datacenter: "dc1", + Key: "test", + } + var dirent structs.IndexedDirEntries + if err := client.Call("KVS.Get", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + + if dirent.Index == 0 { + t.Fatalf("Bad: %v", dirent) + } + if len(dirent.Entries) != 1 { + t.Fatalf("Bad: %v", dirent) + } + d := dirent.Entries[0] + if d.Flags != 42 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "test" { + t.Fatalf("bad: %v", d) + } +} + +func TestKVSEndpoint_List(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + // Wait for leader + time.Sleep(100 * time.Millisecond) + + keys := []string{ + "/test/key1", + "/test/key2", + "/test/sub/key3", + } + + for _, key := range keys { + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: key, + Flags: 1, + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + } + + getR := structs.KeyRequest{ + Datacenter: "dc1", + Key: "/test", + } + var dirent structs.IndexedDirEntries + if err := client.Call("KVS.List", &getR, &dirent); err != nil { + t.Fatalf("err: %v", err) + } + + if dirent.Index == 0 { + t.Fatalf("Bad: %v", dirent) + } + if len(dirent.Entries) != 3 { + t.Fatalf("Bad: %v", dirent.Entries) + } + for i := 0; i < len(dirent.Entries); i++ { + d := dirent.Entries[i] + if d.Key != keys[i] { + t.Fatalf("bad: %v", d) + } + if d.Flags != 1 { + t.Fatalf("bad: %v", d) + } + if d.Value != nil { + t.Fatalf("bad: %v", d) + } + } +} diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 966024c06f..eeead56c34 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -49,10 +49,13 @@ type MDBIndex struct { Unique bool // Controls if values are unique Fields []string // Fields are used to build the index IdxFunc IndexFunc // Can be used to provide custom indexing + Virtual bool // Virtual index does not exist, but can be used for queries + RealIndex string // Virtual indexes use a RealIndex for iteration - table *MDBTable - name string - dbiName string + table *MDBTable + name string + dbiName string + realIndex *MDBIndex } // MDBTxn is used to wrap an underlying transaction @@ -88,6 +91,17 @@ func DefaultIndexFunc(idx *MDBIndex, parts []string) string { return prefix } +// DefaultIndexPrefixFunc can be used with DefaultIndexFunc to scan +// for index prefix values. This should only be used as part of a +// virtual index. +func DefaultIndexPrefixFunc(idx *MDBIndex, parts []string) string { + if len(parts) == 0 { + return "_" + } + prefix := "_" + strings.Join(parts, "||") + return prefix +} + // Init is used to initialize the MDBTable and ensure it's ready func (t *MDBTable) Init() error { if t.Env == nil { @@ -111,6 +125,9 @@ func (t *MDBTable) Init() error { if id.AllowBlank { return fmt.Errorf("id index must not allow blanks") } + if id.Virtual { + return fmt.Errorf("id index cannot be virtual") + } // Create the table if err := t.createTable(); err != nil { @@ -221,6 +238,9 @@ EXTEND: mdbTxn.dbis[t.Name] = dbi for _, index := range t.Indexes { + if index.Virtual { + continue + } dbi, err := index.openDBI(tx) if err != nil { tx.Abort() @@ -237,6 +257,9 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) { // Construct the indexes keys indexes := make(map[string][]byte) for name, index := range t.Indexes { + if index.Virtual { + continue + } key, err := index.keyFromObject(obj) if err != nil { return nil, err @@ -301,6 +324,9 @@ AFTER_DELETE: // Insert the new indexes for name, index := range t.Indexes { + if index.Virtual { + continue + } dbi := tx.dbis[index.dbiName] if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil { return err @@ -350,6 +376,29 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac return results, err } +// StreamTxn is like GetTxn but it streams the results over a channel. +// This can be used if the expected data set is very large. The stream +// is always closed on return. +func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error { + // Always close the stream on return + defer close(stream) + + // Get the associated index + idx, key, err := t.getIndex(index, parts) + if err != nil { + return err + } + + // Stream the results + err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + obj := t.Decoder(res) + stream <- obj + return false + }) + + return err +} + // getIndex is used to get the proper index, and also check the arity func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) { // Get the index @@ -427,6 +476,12 @@ func (t *MDBTable) deleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (num i if name == idx.name { continue } + if idx.Virtual && name == idx.RealIndex { + continue + } + if otherIdx.Virtual { + continue + } dbi := tx.dbis[otherIdx.dbiName] if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil { panic(err) @@ -464,11 +519,23 @@ func (i *MDBIndex) init(table *MDBTable, name string) error { if err := i.createIndex(); err != nil { return err } + // Verify real index exists + if i.Virtual { + if realIndex, ok := table.Indexes[i.RealIndex]; !ok { + return fmt.Errorf("real index '%s' missing", i.RealIndex) + } else { + i.realIndex = realIndex + } + } return nil } // createIndex is used to ensure the index exists func (i *MDBIndex) createIndex() error { + // Do not create if this is a virtual index + if i.Virtual { + return nil + } tx, err := i.table.Env.BeginTxn(nil, 0) if err != nil { return err @@ -529,7 +596,14 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte { func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, cb func(encRowId, res []byte) bool) error { table := tx.dbis[i.table.Name] - dbi := tx.dbis[i.dbiName] + + // If virtual, use the correct DBI + var dbi mdb.DBI + if i.Virtual { + dbi = tx.dbis[i.realIndex.dbiName] + } else { + dbi = tx.dbis[i.dbiName] + } cursor, err := tx.tx.CursorOpen(dbi) if err != nil { diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index 13bfff5b19..d57a9bd6b2 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -781,3 +781,194 @@ func TestMDBTableDelete_Prefix(t *testing.T) { t.Fatalf("expect 2 result: %#v", res) } } + +func TestMDBTableVirtualIndex(t *testing.T) { + dir, env := testMDBEnv(t) + defer os.RemoveAll(dir) + defer env.Close() + + table := &MDBTable{ + Env: env, + Name: "test", + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"First"}, + }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"First"}, + IdxFunc: DefaultIndexPrefixFunc, + }, + }, + Encoder: MockEncoder, + Decoder: MockDecoder, + } + if err := table.Init(); err != nil { + t.Fatalf("err: %v", err) + } + + if table.lastRowID != 0 { + t.Fatalf("bad last row id: %d", table.lastRowID) + } + + objs := []*MockData{ + &MockData{ + Key: "1", + First: "Jack", + Last: "Smith", + Country: "USA", + }, + &MockData{ + Key: "2", + First: "John", + Last: "Wang", + Country: "USA", + }, + &MockData{ + Key: "3", + First: "James", + Last: "Torres", + Country: "Mexico", + }, + } + + // Insert some mock objects + for idx, obj := range objs { + if err := table.Insert(obj); err != nil { + t.Fatalf("err: %v", err) + } + if err := table.SetLastIndex(uint64(4 * idx)); err != nil { + t.Fatalf("err: %v", err) + } + } + + if table.lastRowID != 3 { + t.Fatalf("bad last row id: %d", table.lastRowID) + } + + if idx, _ := table.LastIndex(); idx != 8 { + t.Fatalf("bad last idx: %d", idx) + } + + _, res, err := table.Get("id_prefix", "J") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 3 { + t.Fatalf("expect 3 result: %#v", res) + } + + _, res, err = table.Get("id_prefix", "Ja") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 2 { + t.Fatalf("expect 2 result: %#v", res) + } + + num, err := table.Delete("id_prefix", "Ja") + if err != nil { + t.Fatalf("err: %v", err) + } + if num != 2 { + t.Fatalf("expect 2 result: %#v", num) + } + + _, res, err = table.Get("id_prefix", "J") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 1 { + t.Fatalf("expect 1 result: %#v", res) + } +} + +func TestMDBTableStream(t *testing.T) { + dir, env := testMDBEnv(t) + defer os.RemoveAll(dir) + defer env.Close() + + table := &MDBTable{ + Env: env, + Name: "test", + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "name": &MDBIndex{ + Fields: []string{"First", "Last"}, + }, + "country": &MDBIndex{ + Fields: []string{"Country"}, + }, + }, + Encoder: MockEncoder, + Decoder: MockDecoder, + } + if err := table.Init(); err != nil { + t.Fatalf("err: %v", err) + } + + objs := []*MockData{ + &MockData{ + Key: "1", + First: "Kevin", + Last: "Smith", + Country: "USA", + }, + &MockData{ + Key: "2", + First: "Kevin", + Last: "Wang", + Country: "USA", + }, + &MockData{ + Key: "3", + First: "Bernardo", + Last: "Torres", + Country: "Mexico", + }, + } + + // Insert some mock objects + for idx, obj := range objs { + if err := table.Insert(obj); err != nil { + t.Fatalf("err: %v", err) + } + if err := table.SetLastIndex(uint64(idx + 1)); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Start a readonly txn + tx, err := table.StartTxn(true, nil) + if err != nil { + panic(err) + } + defer tx.Abort() + + // Stream the records + streamCh := make(chan interface{}) + go func() { + if err := table.StreamTxn(streamCh, tx, "id"); err != nil { + t.Fatalf("err: %v", err) + } + }() + + // Verify we get them all + idx := 0 + for obj := range streamCh { + p := obj.(*MockData) + if !reflect.DeepEqual(p, objs[idx]) { + t.Fatalf("bad: %#v %#v", p, objs[idx]) + } + idx++ + } + + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } +} diff --git a/consul/raft_endpoint.go b/consul/raft_endpoint.go index 9db31202c9..97b7adf746 100644 --- a/consul/raft_endpoint.go +++ b/consul/raft_endpoint.go @@ -18,3 +18,8 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error { future := r.server.raft.RemovePeer(peer) return future.Error() } + +func (r *Raft) Snapshot(args struct{}, reply *struct{}) error { + future := r.server.raft.Snapshot() + return future.Error() +} diff --git a/consul/server.go b/consul/server.go index ccd28e9811..9a4a6a59f0 100644 --- a/consul/server.go +++ b/consul/server.go @@ -101,6 +101,7 @@ type endpoints struct { Health *Health Raft *Raft Status *Status + KVS *KVS } // NewServer is used to construct a new Consul server from the @@ -276,12 +277,14 @@ func (s *Server) setupRPC() error { s.endpoints.Raft = &Raft{s} s.endpoints.Catalog = &Catalog{s} s.endpoints.Health = &Health{s} + s.endpoints.KVS = &KVS{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Raft) s.rpcServer.Register(s.endpoints.Catalog) s.rpcServer.Register(s.endpoints.Health) + s.rpcServer.Register(s.endpoints.KVS) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/consul/state_store.go b/consul/state_store.go index cf46b8adde..4b08c04640 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -14,7 +14,8 @@ const ( dbNodes = "nodes" dbServices = "services" dbChecks = "checks" - dbMaxMapSize = 128 * 1024 * 1024 // 128MB maximum size + dbKVS = "kvs" + dbMaxMapSize = 512 * 1024 * 1024 // 512MB maximum size ) // The StateStore is responsible for maintaining all the Consul @@ -31,6 +32,7 @@ type StateStore struct { nodeTable *MDBTable serviceTable *MDBTable checkTable *MDBTable + kvsTable *MDBTable tables MDBTables watch map[*MDBTable]*NotifyGroup queryTables map[string]MDBTables @@ -183,8 +185,31 @@ func (s *StateStore) initialize() error { }, } + s.kvsTable = &MDBTable{ + Name: dbKVS, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"Key"}, + IdxFunc: DefaultIndexPrefixFunc, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.DirEntry) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + // Store the set of tables - s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable} + s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder @@ -206,6 +231,8 @@ func (s *StateStore) initialize() error { "NodeChecks": MDBTables{s.checkTable}, "ServiceChecks": MDBTables{s.checkTable}, "CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable}, + "KVSGet": MDBTables{s.kvsTable}, + "KVSList": MDBTables{s.kvsTable}, } return nil } @@ -686,6 +713,159 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e return nodes } +// KVSSet is used to create or update a KV entry +func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + // Get the existing node + res, err := s.kvsTable.GetTxn(tx, "id", d.Key) + if err != nil { + return err + } + + // Set the create and modify times + if len(res) == 0 { + d.CreateIndex = index + } else { + d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex + } + d.ModifyIndex = index + + if err := s.kvsTable.InsertTxn(tx, d); err != nil { + return err + } + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.kvsTable].Notify() + return tx.Commit() +} + +// KVSRestore is used to restore a DirEntry. It should only be used when +// doing a restore, otherwise KVSSet should be used. +func (s *StateStore) KVSRestore(d *structs.DirEntry) error { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + if err := s.kvsTable.InsertTxn(tx, d); err != nil { + return err + } + return tx.Commit() +} + +// KVSGet is used to get a KV entry +func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { + idx, res, err := s.kvsTable.Get("id", key) + var d *structs.DirEntry + if len(res) > 0 { + d = res[0].(*structs.DirEntry) + } + return idx, d, err +} + +// KVSList is used to list all KV entries with a prefix +func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { + idx, res, err := s.kvsTable.Get("id_prefix", prefix) + ents := make(structs.DirEntries, len(res)) + for idx, r := range res { + ents[idx] = r.(*structs.DirEntry) + } + return idx, ents, err +} + +// KVSDelete is used to delete a KVS entry +func (s *StateStore) KVSDelete(index uint64, key string) error { + return s.kvsDeleteWithIndex(index, "id", key) +} + +// KVSDeleteTree is used to delete all keys with a given prefix +func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { + if prefix == "" { + return s.kvsDeleteWithIndex(index, "id") + } + return s.kvsDeleteWithIndex(index, "id_prefix", prefix) +} + +// kvsDeleteWithIndex does a delete with either the id or id_prefix +func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...) + if err != nil { + return err + } + + if num > 0 { + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + defer s.watch[s.kvsTable].Notify() + } + return tx.Commit() +} + +// KVSCheckAndSet is used to perform an atomic check-and-set +func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { + // Start a new txn + tx, err := s.kvsTable.StartTxn(false, nil) + if err != nil { + return false, err + } + defer tx.Abort() + + // Get the existing node + res, err := s.kvsTable.GetTxn(tx, "id", d.Key) + if err != nil { + return false, err + } + + // Get the existing node if any + var exist *structs.DirEntry + if len(res) > 0 { + exist = res[0].(*structs.DirEntry) + } + + // Use the ModifyIndex as the constraint. A modify of time of 0 + // means we are doing a set-if-not-exists, while any other value + // means we expect that modify time. + if d.ModifyIndex == 0 && exist != nil { + return false, nil + } else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) { + return false, nil + } + + // Set the create and modify times + if exist == nil { + d.CreateIndex = index + } else { + d.CreateIndex = exist.CreateIndex + } + d.ModifyIndex = index + + if err := s.kvsTable.InsertTxn(tx, d); err != nil { + return false, err + } + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return false, err + } + defer s.watch[s.kvsTable].Notify() + return true, tx.Commit() +} + // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { // Begin a new txn on all tables @@ -742,3 +922,10 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks { _, checks := s.store.parseHealthChecks(s.lastIndex, res, err) return checks } + +// KVSDump is used to list all KV entries. It takes a channel and streams +// back *struct.DirEntry objects. This will block and should be invoked +// in a goroutine. +func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error { + return s.store.kvsTable.StreamTxn(stream, s.tx, "id") +} diff --git a/consul/state_store_test.go b/consul/state_store_test.go index de5ccd8058..5e5bf3e8e1 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -550,6 +550,16 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("err: %v") } + // Add some KVS entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(14, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(15, d); err != nil { + t.Fatalf("err: %v", err) + } + // Take a snapshot snap, err := store.Snapshot() if err != nil { @@ -558,7 +568,7 @@ func TestStoreSnapshot(t *testing.T) { defer snap.Close() // Check the last nodes - if idx := snap.LastIndex(); idx != 13 { + if idx := snap.LastIndex(); idx != 15 { t.Fatalf("bad: %v", idx) } @@ -591,6 +601,28 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("bad: %v", checks[0]) } + // Check we have the entries + streamCh := make(chan interface{}, 64) + doneCh := make(chan struct{}) + var ents []*structs.DirEntry + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.KVSDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 2 { + t.Fatalf("missing KVS entries!") + } + // Make some changes! if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil { t.Fatalf("err: %v", err) @@ -612,6 +644,10 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("err: %v") } + if err := store.KVSDelete(18, "/web/a"); err != nil { + t.Fatalf("err: %v") + } + // Check snapshot has old values nodes = snap.Nodes() if len(nodes) != 2 { @@ -639,6 +675,28 @@ func TestStoreSnapshot(t *testing.T) { if !reflect.DeepEqual(checks[0], check) { t.Fatalf("bad: %v", checks[0]) } + + // Check we have the entries + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.KVSDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 2 { + t.Fatalf("missing KVS entries!") + } } func TestEnsureCheck(t *testing.T) { @@ -933,3 +991,279 @@ func TestSS_Register_Deregister_Query(t *testing.T) { t.Fatalf("Bad: %v", nodes) } } + +func TestKVSSet_Get(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Should not exist + idx, d, err := store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 0 { + t.Fatalf("bad: %v", idx) + } + if d != nil { + t.Fatalf("bad: %v", d) + } + + // Create the entry + d = &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should exist exist + idx, d, err = store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if d.CreateIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.ModifyIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.Key != "/foo" { + t.Fatalf("bad: %v", d) + } + if d.Flags != 42 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "test" { + t.Fatalf("bad: %v", d) + } + + // Update the entry + d = &structs.DirEntry{Key: "/foo", Flags: 43, Value: []byte("zip")} + if err := store.KVSSet(1010, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should update + idx, d, err = store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1010 { + t.Fatalf("bad: %v", idx) + } + if d.CreateIndex != 1000 { + t.Fatalf("bad: %v", d) + } + if d.ModifyIndex != 1010 { + t.Fatalf("bad: %v", d) + } + if d.Key != "/foo" { + t.Fatalf("bad: %v", d) + } + if d.Flags != 43 { + t.Fatalf("bad: %v", d) + } + if string(d.Value) != "zip" { + t.Fatalf("bad: %v", d) + } +} + +func TestKVSDelete(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entry + d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Delete the entry + if err := store.KVSDelete(1020, "/foo"); err != nil { + t.Fatalf("err: %v", err) + } + + // Should not exist + idx, d, err := store.KVSGet("/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1020 { + t.Fatalf("bad: %v", idx) + } + if d != nil { + t.Fatalf("bad: %v", d) + } +} + +func TestKVSCheckAndSet(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // CAS should fail, no entry + d := &structs.DirEntry{ + ModifyIndex: 100, + Key: "/foo", + Flags: 42, + Value: []byte("test"), + } + ok, err := store.KVSCheckAndSet(1000, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // Constrain on not-exist, should work + d.ModifyIndex = 0 + ok, err = store.KVSCheckAndSet(1001, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected commit") + } + + // Constrain on not-exist, should fail + d.ModifyIndex = 0 + ok, err = store.KVSCheckAndSet(1002, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // Constrain on a wrong modify time + d.ModifyIndex = 1000 + ok, err = store.KVSCheckAndSet(1003, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if ok { + t.Fatalf("unexpected commit") + } + + // Constrain on a correct modify time + d.ModifyIndex = 1001 + ok, err = store.KVSCheckAndSet(1004, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("expected commit") + } +} + +func TestKVS_List(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Should not exist + idx, ents, err := store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 0 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 0 { + t.Fatalf("bad: %v", ents) + } + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Should list + idx, ents, err = store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1002 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 3 { + t.Fatalf("bad: %v", ents) + } + + if ents[0].Key != "/web/a" { + t.Fatalf("bad: %v", ents[0]) + } + if ents[1].Key != "/web/b" { + t.Fatalf("bad: %v", ents[1]) + } + if ents[2].Key != "/web/sub/c" { + t.Fatalf("bad: %v", ents[2]) + } +} + +func TestKVSDeleteTree(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Should not exist + err = store.KVSDeleteTree(1000, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke the web tree + err = store.KVSDeleteTree(1010, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Nothing should list + idx, ents, err := store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1010 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 0 { + t.Fatalf("bad: %v", ents) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 5b32320f28..638749cfc0 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -18,6 +18,7 @@ type MessageType uint8 const ( RegisterRequestType MessageType = iota DeregisterRequestType + KVSRequestType ) const ( @@ -172,6 +173,45 @@ type IndexedCheckServiceNodes struct { Nodes CheckServiceNodes } +// DirEntry is used to represent a directory entry. This is +// used for values in our Key-Value store. +type DirEntry struct { + CreateIndex uint64 + ModifyIndex uint64 + Key string + Flags uint64 + Value []byte +} +type DirEntries []*DirEntry + +type KVSOp string + +const ( + KVSSet KVSOp = "set" + KVSDelete = "delete" + KVSDeleteTree = "delete-tree" + KVSCAS = "cas" // Check-and-set +) + +// KVSRequest is used to operate on the Key-Value store +type KVSRequest struct { + Datacenter string + Op KVSOp // Which operation are we performing + DirEnt DirEntry // Which directory entry +} + +// KeyRequest is used to request a key, or key prefix +type KeyRequest struct { + Datacenter string + Key string + BlockingQuery +} + +type IndexedDirEntries struct { + Index uint64 + Entries DirEntries +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index 0f0bd7a38b..b938e7ec94 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -7,11 +7,12 @@ sidebar_current: "docs-agent-http" # HTTP API The main interface to Consul is a RESTful HTTP API. The API can be -used for CRUD for nodes, services, and checks. The endpoints are +used for CRUD for nodes, services, checks, and configuration. The endpoints are versioned to enable changes without breaking backwards compatibility. -All endpoints fall into one of 4 categories: +All endpoints fall into one of 5 categories: +* kv - Key/Value store * agent - Agent control * catalog - Manages nodes and services * health - Manages health checks @@ -28,7 +29,7 @@ Queries that support this will mention it specifically, however the use of this feature is the same for all. If supported, the query will set an HTTP header "X-Consul-Index". This is an opaque handle that the client will use. -To cause a query to block, the query parameters "?wait=&index=" are added +To cause a query to block, the query parameters "?wait=\&index=\" are added to a request. The "?wait=" query parameter limits how long the query will potentially block for. It not set, it will default to 10 minutes. It can be specified in the form of "10s" or "5m", which is 10 seconds or 5 minutes respectively. The "?index=" parameter is an @@ -41,6 +42,72 @@ note is that when the query returns there is **no guarantee** of a change. It is possible that the timeout was reached, or that there was an idempotent write that does not affect the result. + +## KV + +The KV endpoint is used to expose a simple key/value store. This can be used +to store service configurations or other meta data in a simple way. It has only +a single endpoint: + + /v1/kv/ + +This is the only endpoint that is used with the Key/Value store. +It's use depends on the HTTP method. The `GET`, `PUT` and `DELETE` methods +are all supported. + +When using the `GET` method, Consul will return the specified key, +or if the "?recurse" query parameter is provided, it will return +all keys with the given prefix. + +Each object will look like: + + [ + { + "CreateIndex":100, + "ModifyIndex":200, + "Key":"zip", + "Flags":0, + "Value":"dGVzdA==" + } + ] + +The `CreateIndex` is the internal index value that represents +when the entry was created. The `ModifyIndex` is the last index +that modified this key. This index corresponds to the `X-Consul-Index` +header value that is returned. A blocking query can be used to wait for +a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds +to the latest `ModifyIndex` and so a blocking query waits until any of the +listed keys are updated. + +The `Key` is simply the full path of the entry. `Flags` are an opaque +unsigned integer that can be attached to each entry. The use of this is +left totally to the user. Lastly, the `Value` is a base64 key value. + +If no entries are found, a 404 code is returned. + +When using the `PUT` method, Consul expects the request body to be the +value corresponding to the key. There are a number of parameters that can +be used with a PUT request: + +* ?flags=\ : This can be used to specify an unsigned value between + 0 and 2^64-1. It is opaque to the user, but a client application may + use it. + +* ?cas=\ : This flag is used to turn the `PUT` into a **Check-And-Set** + operation. This is very useful as it allows clients to build more complex + syncronization primitives on top. If the index is 0, then Consul will only + put the key if it does not already exist. If the index is non-zero, then + the key is only set if the index matches the `ModifyIndex` of that key. + +The return value is simply either `true` or `false`. If the CAS check fails, +then `false` will be returned. + +Lastly, the `DELETE` method can be used to delete a single key or all +keys sharing a prefix. If the "?recurse" query parameter is provided, +then all keys with the prefix are deleted, otherwise only the specified +key. + + ## Agent The Agent endpoints are used to interact with a local Consul agent. Usually, diff --git a/website/source/docs/commands/info.html.markdown b/website/source/docs/commands/info.html.markdown index 0cb95f944e..f66c83a127 100644 --- a/website/source/docs/commands/info.html.markdown +++ b/website/source/docs/commands/info.html.markdown @@ -35,30 +35,34 @@ Here is an example output: num_peers = 2 state = Leader term = 4 - serf-lan: - event-queue = 0 - event-time = 2 + serf_lan: + event_queue = 0 + event_time = 2 failed = 0 - intent-queue = 0 + intent_queue = 0 left = 0 - member-time = 7 + member_time = 7 members = 3 - serf-wan: - event-queue = 0 - event-time = 1 + query_queue = 0 + query_time = 1 + serf_wan: + event_queue = 0 + event_time = 1 failed = 0 - intent-queue = 0 + intent_queue = 0 left = 0 - member-time = 1 + member_time = 1 members = 1 + query_queue = 0 + query_time = 1 There are currently the top-level keys for: * agent: Provides information about the agent * consul: Information about the consul library (client or server) * raft: Provides info about the Raft [consensus library](/docs/internals/consensus.html) -* serf-lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html) -* serf-wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html) +* serf_lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html) +* serf_wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html) ## Usage