mirror of
https://github.com/status-im/consul.git
synced 2025-01-27 14:05:45 +00:00
Merge branch 'master' into jf_improve_consensus_internals_doc
This commit is contained in:
commit
bd99f56bc6
@ -88,6 +88,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
||||
s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
|
||||
s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService))
|
||||
|
||||
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
|
||||
|
||||
if enableDebug {
|
||||
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
||||
|
@ -8,6 +8,7 @@ register new services.
|
||||
The URLs are also versioned to allow for changes in the API.
|
||||
The current URLs supported are:
|
||||
|
||||
Catalog:
|
||||
* /v1/catalog/register : Registers a new service
|
||||
* /v1/catalog/deregister : Deregisters a service or node
|
||||
* /v1/catalog/datacenters : Lists known datacenters
|
||||
@ -16,15 +17,17 @@ The current URLs supported are:
|
||||
* /v1/catalog/service/<service>/ : Lists the nodes in a given service
|
||||
* /v1/catalog/node/<node>/ : Lists the services provided by a node
|
||||
|
||||
* Health system:
|
||||
Health system:
|
||||
* /v1/health/node/<node>: Returns the health info of a node
|
||||
* /v1/health/checks/<service>: Returns the checks of a service
|
||||
* /v1/health/service/<service>: Returns the nodes and health info of a service
|
||||
* /v1/health/state/<state>: Returns the checks in a given state
|
||||
|
||||
Status:
|
||||
* /v1/status/leader : Returns the current Raft leader
|
||||
* /v1/status/peers : Returns the current Raft peer set
|
||||
|
||||
Agent:
|
||||
* /v1/agent/checks: Returns the checks the local agent is managing
|
||||
* /v1/agent/services : Returns the services local agent is managing
|
||||
* /v1/agent/members : Returns the members as seen by the local serf agent
|
||||
@ -37,3 +40,7 @@ The current URLs supported are:
|
||||
* /v1/agent/check/fail/<name>
|
||||
* /v1/agent/service/register
|
||||
* /v1/agent/service/deregister/<name>
|
||||
|
||||
KVS:
|
||||
* /v1/kv/<key>
|
||||
|
||||
|
153
command/agent/kvs_endpoint.go
Normal file
153
command/agent/kvs_endpoint.go
Normal file
@ -0,0 +1,153 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Set default DC
|
||||
args := structs.KeyRequest{}
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Pull out the key name, validation left to each sub-handler
|
||||
args.Key = strings.TrimPrefix(req.URL.Path, "/v1/kv/")
|
||||
|
||||
// Switch on the method
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
return s.KVSGet(resp, req, &args)
|
||||
case "PUT":
|
||||
return s.KVSPut(resp, req, &args)
|
||||
case "DELETE":
|
||||
return s.KVSDelete(resp, req, &args)
|
||||
default:
|
||||
resp.WriteHeader(405)
|
||||
return nil, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// KVSGet handles a GET request
|
||||
func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
|
||||
// Check for recurse
|
||||
method := "KVS.Get"
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["recurse"]; ok {
|
||||
method = "KVS.List"
|
||||
} else if missingKey(resp, args) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Make the RPC
|
||||
var out structs.IndexedDirEntries
|
||||
if err := s.agent.RPC(method, &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setIndex(resp, out.Index)
|
||||
|
||||
// Check if we get a not found
|
||||
if len(out.Entries) == 0 {
|
||||
resp.WriteHeader(404)
|
||||
return nil, nil
|
||||
}
|
||||
return out.Entries, nil
|
||||
}
|
||||
|
||||
// KVSPut handles a PUT request
|
||||
func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
|
||||
if missingKey(resp, args) {
|
||||
return nil, nil
|
||||
}
|
||||
applyReq := structs.KVSRequest{
|
||||
Datacenter: args.Datacenter,
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: args.Key,
|
||||
Flags: 0,
|
||||
Value: nil,
|
||||
},
|
||||
}
|
||||
|
||||
// Check for flags
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["flags"]; ok {
|
||||
flagVal, err := strconv.ParseUint(params.Get("flags"), 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
applyReq.DirEnt.Flags = flagVal
|
||||
}
|
||||
|
||||
// Check for cas value
|
||||
if _, ok := params["cas"]; ok {
|
||||
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
applyReq.DirEnt.ModifyIndex = casVal
|
||||
applyReq.Op = structs.KVSCAS
|
||||
}
|
||||
|
||||
// Copy the value
|
||||
buf := bytes.NewBuffer(nil)
|
||||
if _, err := io.Copy(buf, req.Body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
applyReq.DirEnt.Value = buf.Bytes()
|
||||
|
||||
// Make the RPC
|
||||
var out bool
|
||||
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Only use the out value if this was a CAS
|
||||
if applyReq.Op == structs.KVSSet {
|
||||
return true, nil
|
||||
} else {
|
||||
return out, nil
|
||||
}
|
||||
}
|
||||
|
||||
// KVSPut handles a DELETE request
|
||||
func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
|
||||
applyReq := structs.KVSRequest{
|
||||
Datacenter: args.Datacenter,
|
||||
Op: structs.KVSDelete,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: args.Key,
|
||||
},
|
||||
}
|
||||
|
||||
// Check for recurse
|
||||
params := req.URL.Query()
|
||||
if _, ok := params["recurse"]; ok {
|
||||
applyReq.Op = structs.KVSDeleteTree
|
||||
} else if missingKey(resp, args) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Make the RPC
|
||||
var out bool
|
||||
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// missingKey checks if the key is missing
|
||||
func missingKey(resp http.ResponseWriter, args *structs.KeyRequest) bool {
|
||||
if args.Key == "" {
|
||||
resp.WriteHeader(400)
|
||||
resp.Write([]byte("Missing key name"))
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
291
command/agent/kvs_endpoint_test.go
Normal file
291
command/agent/kvs_endpoint_test.go
Normal file
@ -0,0 +1,291 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
// Wait for a leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
keys := []string{
|
||||
"baz",
|
||||
"bar",
|
||||
"foo/sub1",
|
||||
"foo/sub2",
|
||||
"zip",
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
buf := bytes.NewBuffer([]byte("test"))
|
||||
req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
req, err := http.NewRequest("GET", "/v1/kv/"+key, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
header := resp.Header().Get("X-Consul-Index")
|
||||
if header == "" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
|
||||
res, ok := obj.(structs.DirEntries)
|
||||
if !ok {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
|
||||
if len(res) != 1 {
|
||||
t.Fatalf("bad: %v", res)
|
||||
}
|
||||
|
||||
if res[0].Key != key {
|
||||
t.Fatalf("bad: %v", res)
|
||||
}
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
req, err := http.NewRequest("DELETE", "/v1/kv/"+key, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
_, err = srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSEndpoint_Recurse(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
// Wait for a leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
keys := []string{
|
||||
"bar",
|
||||
"baz",
|
||||
"foo/sub1",
|
||||
"foo/sub2",
|
||||
"zip",
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
buf := bytes.NewBuffer([]byte("test"))
|
||||
req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Get all the keys
|
||||
req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
header := resp.Header().Get("X-Consul-Index")
|
||||
if header == "" {
|
||||
t.Fatalf("Bad: %v", header)
|
||||
}
|
||||
|
||||
res, ok := obj.(structs.DirEntries)
|
||||
if !ok {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
|
||||
if len(res) != len(keys) {
|
||||
t.Fatalf("bad: %v", res)
|
||||
}
|
||||
|
||||
for idx, key := range keys {
|
||||
if res[idx].Key != key {
|
||||
t.Fatalf("bad: %v %v", res[idx].Key, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
req, err := http.NewRequest("DELETE", "/v1/kv/?recurse", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
_, err = srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Get all the keys
|
||||
req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if obj != nil {
|
||||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSEndpoint_CAS(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
// Wait for a leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte("test"))
|
||||
req, err := http.NewRequest("PUT", "/v1/kv/test?flags=50", buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", "/v1/kv/test", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d := obj.(structs.DirEntries)[0]
|
||||
|
||||
// Check the flags
|
||||
if d.Flags != 50 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
|
||||
// Create a CAS request, bad index
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte("zip"))
|
||||
req, err := http.NewRequest("PUT",
|
||||
fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex-1), buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); res {
|
||||
t.Fatalf("should NOT work")
|
||||
}
|
||||
}
|
||||
|
||||
// Create a CAS request, good index
|
||||
{
|
||||
buf := bytes.NewBuffer([]byte("zip"))
|
||||
req, err := http.NewRequest("PUT",
|
||||
fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex), buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the update
|
||||
req, _ = http.NewRequest("GET", "/v1/kv/test", nil)
|
||||
resp = httptest.NewRecorder()
|
||||
obj, _ = srv.KVSEndpoint(resp, req)
|
||||
d = obj.(structs.DirEntries)[0]
|
||||
|
||||
if d.Flags != 42 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if string(d.Value) != "zip" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
@ -65,6 +65,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
||||
return c.decodeRegister(buf[1:], log.Index)
|
||||
case structs.DeregisterRequestType:
|
||||
return c.applyDeregister(buf[1:], log.Index)
|
||||
case structs.KVSRequestType:
|
||||
return c.applyKVSOperation(buf[1:], log.Index)
|
||||
default:
|
||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||
}
|
||||
@ -131,6 +133,32 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.KVSRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
switch req.Op {
|
||||
case structs.KVSSet:
|
||||
return c.state.KVSSet(index, &req.DirEnt)
|
||||
case structs.KVSDelete:
|
||||
return c.state.KVSDelete(index, req.DirEnt.Key)
|
||||
case structs.KVSDeleteTree:
|
||||
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
|
||||
case structs.KVSCAS:
|
||||
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return act
|
||||
}
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
defer func(start time.Time) {
|
||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||
@ -152,6 +180,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.state.Close()
|
||||
c.state = state
|
||||
|
||||
// Create a decoder
|
||||
@ -184,6 +213,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||
}
|
||||
c.applyRegister(&req, header.LastIndex)
|
||||
|
||||
case structs.KVSRequestType:
|
||||
var req structs.DirEntry
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.state.KVSRestore(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
||||
}
|
||||
@ -247,6 +285,38 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Enable GC of the ndoes
|
||||
nodes = nil
|
||||
|
||||
// Dump the KVS entries
|
||||
streamCh := make(chan interface{}, 256)
|
||||
errorCh := make(chan error)
|
||||
go func() {
|
||||
if err := s.state.KVSDump(streamCh); err != nil {
|
||||
errorCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case raw := <-streamCh:
|
||||
if raw == nil {
|
||||
break OUTER
|
||||
}
|
||||
sink.Write([]byte{byte(structs.KVSRequestType)})
|
||||
if err := encoder.Encode(raw); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
case err := <-errorCh:
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -322,6 +322,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "web",
|
||||
})
|
||||
fsm.state.KVSSet(8, &structs.DirEntry{
|
||||
Key: "/test",
|
||||
Value: []byte("foo"),
|
||||
})
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
@ -370,4 +374,198 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
||||
if len(checks) != 1 {
|
||||
t.Fatalf("Bad: %v", checks)
|
||||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet("/test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if string(d.Value) != "foo" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_KVSSet(t *testing.T) {
|
||||
fsm, err := NewFSM(os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer fsm.Close()
|
||||
|
||||
req := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "/test/path",
|
||||
Flags: 0,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("missing")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_KVSDelete(t *testing.T) {
|
||||
fsm, err := NewFSM(os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer fsm.Close()
|
||||
|
||||
req := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "/test/path",
|
||||
Flags: 0,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Run the delete
|
||||
req.Op = structs.KVSDelete
|
||||
buf, err = structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify key is not set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d != nil {
|
||||
t.Fatalf("key present")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_KVSDeleteTree(t *testing.T) {
|
||||
fsm, err := NewFSM(os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer fsm.Close()
|
||||
|
||||
req := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "/test/path",
|
||||
Flags: 0,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Run the delete tree
|
||||
req.Op = structs.KVSDeleteTree
|
||||
req.DirEnt.Key = "/test"
|
||||
buf, err = structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify key is not set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d != nil {
|
||||
t.Fatalf("key present")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_KVSCheckAndSet(t *testing.T) {
|
||||
fsm, err := NewFSM(os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer fsm.Close()
|
||||
|
||||
req := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "/test/path",
|
||||
Flags: 0,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("key missing")
|
||||
}
|
||||
|
||||
// Run the check-and-set
|
||||
req.Op = structs.KVSCAS
|
||||
req.DirEnt.ModifyIndex = d.ModifyIndex
|
||||
req.DirEnt.Value = []byte("zip")
|
||||
buf, err = structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp.(bool) != true {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify key is updated
|
||||
_, d, err = fsm.state.KVSGet("/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if string(d.Value) != "zip" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
115
consul/kvs_endpoint.go
Normal file
115
consul/kvs_endpoint.go
Normal file
@ -0,0 +1,115 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"time"
|
||||
)
|
||||
|
||||
// KVS endpoint is used to manipulate the Key-Value store
|
||||
type KVS struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// Apply is used to apply a KVS request to the data store. This should
|
||||
// only be used for operations that modify the data
|
||||
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
||||
if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
|
||||
|
||||
// Verify the args
|
||||
if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree {
|
||||
return fmt.Errorf("Must provide key")
|
||||
}
|
||||
|
||||
// Apply the update
|
||||
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
|
||||
if err != nil {
|
||||
k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Check if the return type is a bool
|
||||
if respBool, ok := resp.(bool); ok {
|
||||
*reply = respBool
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get is used to lookup a single key
|
||||
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
|
||||
if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
return k.srv.blockingRPC(&args.BlockingQuery,
|
||||
state.QueryTables("KVSGet"),
|
||||
func() (uint64, error) {
|
||||
index, ent, err := state.KVSGet(args.Key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if ent == nil {
|
||||
// Must provide non-zero index to prevent blocking
|
||||
// Index 1 is impossible anyways (due to Raft internals)
|
||||
if index == 0 {
|
||||
reply.Index = 1
|
||||
} else {
|
||||
reply.Index = index
|
||||
}
|
||||
reply.Entries = nil
|
||||
} else {
|
||||
reply.Index = ent.ModifyIndex
|
||||
reply.Entries = structs.DirEntries{ent}
|
||||
}
|
||||
return reply.Index, nil
|
||||
})
|
||||
}
|
||||
|
||||
// List is used to list all keys with a given prefix
|
||||
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
|
||||
if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the local state
|
||||
state := k.srv.fsm.State()
|
||||
return k.srv.blockingRPC(&args.BlockingQuery,
|
||||
state.QueryTables("KVSList"),
|
||||
func() (uint64, error) {
|
||||
index, ent, err := state.KVSList(args.Key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(ent) == 0 {
|
||||
// Must provide non-zero index to prevent blocking
|
||||
// Index 1 is impossible anyways (due to Raft internals)
|
||||
if index == 0 {
|
||||
reply.Index = 1
|
||||
} else {
|
||||
reply.Index = index
|
||||
}
|
||||
reply.Entries = nil
|
||||
} else {
|
||||
// Determine the maximum affected index
|
||||
var maxIndex uint64
|
||||
for _, e := range ent {
|
||||
if e.ModifyIndex > maxIndex {
|
||||
maxIndex = e.ModifyIndex
|
||||
}
|
||||
}
|
||||
|
||||
reply.Index = maxIndex
|
||||
reply.Entries = ent
|
||||
}
|
||||
return reply.Index, nil
|
||||
})
|
||||
}
|
173
consul/kvs_endpoint_test.go
Normal file
173
consul/kvs_endpoint_test.go
Normal file
@ -0,0 +1,173 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestKVS_Apply(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
// Wait for leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
arg := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "test",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify
|
||||
state := s1.fsm.State()
|
||||
_, d, err := state.KVSGet("test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("should not be nil")
|
||||
}
|
||||
|
||||
// Do a check and set
|
||||
arg.Op = structs.KVSCAS
|
||||
arg.DirEnt.ModifyIndex = d.ModifyIndex
|
||||
arg.DirEnt.Flags = 43
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check this was applied
|
||||
if out != true {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
|
||||
// Verify
|
||||
_, d, err = state.KVSGet("test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d.Flags != 43 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVS_Get(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
// Wait for leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
arg := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "test",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
getR := structs.KeyRequest{
|
||||
Datacenter: "dc1",
|
||||
Key: "test",
|
||||
}
|
||||
var dirent structs.IndexedDirEntries
|
||||
if err := client.Call("KVS.Get", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if dirent.Index == 0 {
|
||||
t.Fatalf("Bad: %v", dirent)
|
||||
}
|
||||
if len(dirent.Entries) != 1 {
|
||||
t.Fatalf("Bad: %v", dirent)
|
||||
}
|
||||
d := dirent.Entries[0]
|
||||
if d.Flags != 42 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if string(d.Value) != "test" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSEndpoint_List(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
// Wait for leader
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
keys := []string{
|
||||
"/test/key1",
|
||||
"/test/key2",
|
||||
"/test/sub/key3",
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
arg := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: key,
|
||||
Flags: 1,
|
||||
},
|
||||
}
|
||||
var out bool
|
||||
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
getR := structs.KeyRequest{
|
||||
Datacenter: "dc1",
|
||||
Key: "/test",
|
||||
}
|
||||
var dirent structs.IndexedDirEntries
|
||||
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if dirent.Index == 0 {
|
||||
t.Fatalf("Bad: %v", dirent)
|
||||
}
|
||||
if len(dirent.Entries) != 3 {
|
||||
t.Fatalf("Bad: %v", dirent.Entries)
|
||||
}
|
||||
for i := 0; i < len(dirent.Entries); i++ {
|
||||
d := dirent.Entries[i]
|
||||
if d.Key != keys[i] {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.Flags != 1 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.Value != nil {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
}
|
@ -49,10 +49,13 @@ type MDBIndex struct {
|
||||
Unique bool // Controls if values are unique
|
||||
Fields []string // Fields are used to build the index
|
||||
IdxFunc IndexFunc // Can be used to provide custom indexing
|
||||
Virtual bool // Virtual index does not exist, but can be used for queries
|
||||
RealIndex string // Virtual indexes use a RealIndex for iteration
|
||||
|
||||
table *MDBTable
|
||||
name string
|
||||
dbiName string
|
||||
table *MDBTable
|
||||
name string
|
||||
dbiName string
|
||||
realIndex *MDBIndex
|
||||
}
|
||||
|
||||
// MDBTxn is used to wrap an underlying transaction
|
||||
@ -88,6 +91,17 @@ func DefaultIndexFunc(idx *MDBIndex, parts []string) string {
|
||||
return prefix
|
||||
}
|
||||
|
||||
// DefaultIndexPrefixFunc can be used with DefaultIndexFunc to scan
|
||||
// for index prefix values. This should only be used as part of a
|
||||
// virtual index.
|
||||
func DefaultIndexPrefixFunc(idx *MDBIndex, parts []string) string {
|
||||
if len(parts) == 0 {
|
||||
return "_"
|
||||
}
|
||||
prefix := "_" + strings.Join(parts, "||")
|
||||
return prefix
|
||||
}
|
||||
|
||||
// Init is used to initialize the MDBTable and ensure it's ready
|
||||
func (t *MDBTable) Init() error {
|
||||
if t.Env == nil {
|
||||
@ -111,6 +125,9 @@ func (t *MDBTable) Init() error {
|
||||
if id.AllowBlank {
|
||||
return fmt.Errorf("id index must not allow blanks")
|
||||
}
|
||||
if id.Virtual {
|
||||
return fmt.Errorf("id index cannot be virtual")
|
||||
}
|
||||
|
||||
// Create the table
|
||||
if err := t.createTable(); err != nil {
|
||||
@ -221,6 +238,9 @@ EXTEND:
|
||||
mdbTxn.dbis[t.Name] = dbi
|
||||
|
||||
for _, index := range t.Indexes {
|
||||
if index.Virtual {
|
||||
continue
|
||||
}
|
||||
dbi, err := index.openDBI(tx)
|
||||
if err != nil {
|
||||
tx.Abort()
|
||||
@ -237,6 +257,9 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) {
|
||||
// Construct the indexes keys
|
||||
indexes := make(map[string][]byte)
|
||||
for name, index := range t.Indexes {
|
||||
if index.Virtual {
|
||||
continue
|
||||
}
|
||||
key, err := index.keyFromObject(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -301,6 +324,9 @@ AFTER_DELETE:
|
||||
|
||||
// Insert the new indexes
|
||||
for name, index := range t.Indexes {
|
||||
if index.Virtual {
|
||||
continue
|
||||
}
|
||||
dbi := tx.dbis[index.dbiName]
|
||||
if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil {
|
||||
return err
|
||||
@ -350,6 +376,29 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac
|
||||
return results, err
|
||||
}
|
||||
|
||||
// StreamTxn is like GetTxn but it streams the results over a channel.
|
||||
// This can be used if the expected data set is very large. The stream
|
||||
// is always closed on return.
|
||||
func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error {
|
||||
// Always close the stream on return
|
||||
defer close(stream)
|
||||
|
||||
// Get the associated index
|
||||
idx, key, err := t.getIndex(index, parts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Stream the results
|
||||
err = idx.iterate(tx, key, func(encRowId, res []byte) bool {
|
||||
obj := t.Decoder(res)
|
||||
stream <- obj
|
||||
return false
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// getIndex is used to get the proper index, and also check the arity
|
||||
func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) {
|
||||
// Get the index
|
||||
@ -427,6 +476,12 @@ func (t *MDBTable) deleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (num i
|
||||
if name == idx.name {
|
||||
continue
|
||||
}
|
||||
if idx.Virtual && name == idx.RealIndex {
|
||||
continue
|
||||
}
|
||||
if otherIdx.Virtual {
|
||||
continue
|
||||
}
|
||||
dbi := tx.dbis[otherIdx.dbiName]
|
||||
if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil {
|
||||
panic(err)
|
||||
@ -464,11 +519,23 @@ func (i *MDBIndex) init(table *MDBTable, name string) error {
|
||||
if err := i.createIndex(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Verify real index exists
|
||||
if i.Virtual {
|
||||
if realIndex, ok := table.Indexes[i.RealIndex]; !ok {
|
||||
return fmt.Errorf("real index '%s' missing", i.RealIndex)
|
||||
} else {
|
||||
i.realIndex = realIndex
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createIndex is used to ensure the index exists
|
||||
func (i *MDBIndex) createIndex() error {
|
||||
// Do not create if this is a virtual index
|
||||
if i.Virtual {
|
||||
return nil
|
||||
}
|
||||
tx, err := i.table.Env.BeginTxn(nil, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -529,7 +596,14 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte {
|
||||
func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
|
||||
cb func(encRowId, res []byte) bool) error {
|
||||
table := tx.dbis[i.table.Name]
|
||||
dbi := tx.dbis[i.dbiName]
|
||||
|
||||
// If virtual, use the correct DBI
|
||||
var dbi mdb.DBI
|
||||
if i.Virtual {
|
||||
dbi = tx.dbis[i.realIndex.dbiName]
|
||||
} else {
|
||||
dbi = tx.dbis[i.dbiName]
|
||||
}
|
||||
|
||||
cursor, err := tx.tx.CursorOpen(dbi)
|
||||
if err != nil {
|
||||
|
@ -781,3 +781,194 @@ func TestMDBTableDelete_Prefix(t *testing.T) {
|
||||
t.Fatalf("expect 2 result: %#v", res)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMDBTableVirtualIndex(t *testing.T) {
|
||||
dir, env := testMDBEnv(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer env.Close()
|
||||
|
||||
table := &MDBTable{
|
||||
Env: env,
|
||||
Name: "test",
|
||||
Indexes: map[string]*MDBIndex{
|
||||
"id": &MDBIndex{
|
||||
Unique: true,
|
||||
Fields: []string{"First"},
|
||||
},
|
||||
"id_prefix": &MDBIndex{
|
||||
Virtual: true,
|
||||
RealIndex: "id",
|
||||
Fields: []string{"First"},
|
||||
IdxFunc: DefaultIndexPrefixFunc,
|
||||
},
|
||||
},
|
||||
Encoder: MockEncoder,
|
||||
Decoder: MockDecoder,
|
||||
}
|
||||
if err := table.Init(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if table.lastRowID != 0 {
|
||||
t.Fatalf("bad last row id: %d", table.lastRowID)
|
||||
}
|
||||
|
||||
objs := []*MockData{
|
||||
&MockData{
|
||||
Key: "1",
|
||||
First: "Jack",
|
||||
Last: "Smith",
|
||||
Country: "USA",
|
||||
},
|
||||
&MockData{
|
||||
Key: "2",
|
||||
First: "John",
|
||||
Last: "Wang",
|
||||
Country: "USA",
|
||||
},
|
||||
&MockData{
|
||||
Key: "3",
|
||||
First: "James",
|
||||
Last: "Torres",
|
||||
Country: "Mexico",
|
||||
},
|
||||
}
|
||||
|
||||
// Insert some mock objects
|
||||
for idx, obj := range objs {
|
||||
if err := table.Insert(obj); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := table.SetLastIndex(uint64(4 * idx)); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if table.lastRowID != 3 {
|
||||
t.Fatalf("bad last row id: %d", table.lastRowID)
|
||||
}
|
||||
|
||||
if idx, _ := table.LastIndex(); idx != 8 {
|
||||
t.Fatalf("bad last idx: %d", idx)
|
||||
}
|
||||
|
||||
_, res, err := table.Get("id_prefix", "J")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(res) != 3 {
|
||||
t.Fatalf("expect 3 result: %#v", res)
|
||||
}
|
||||
|
||||
_, res, err = table.Get("id_prefix", "Ja")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(res) != 2 {
|
||||
t.Fatalf("expect 2 result: %#v", res)
|
||||
}
|
||||
|
||||
num, err := table.Delete("id_prefix", "Ja")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if num != 2 {
|
||||
t.Fatalf("expect 2 result: %#v", num)
|
||||
}
|
||||
|
||||
_, res, err = table.Get("id_prefix", "J")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(res) != 1 {
|
||||
t.Fatalf("expect 1 result: %#v", res)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMDBTableStream(t *testing.T) {
|
||||
dir, env := testMDBEnv(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer env.Close()
|
||||
|
||||
table := &MDBTable{
|
||||
Env: env,
|
||||
Name: "test",
|
||||
Indexes: map[string]*MDBIndex{
|
||||
"id": &MDBIndex{
|
||||
Unique: true,
|
||||
Fields: []string{"Key"},
|
||||
},
|
||||
"name": &MDBIndex{
|
||||
Fields: []string{"First", "Last"},
|
||||
},
|
||||
"country": &MDBIndex{
|
||||
Fields: []string{"Country"},
|
||||
},
|
||||
},
|
||||
Encoder: MockEncoder,
|
||||
Decoder: MockDecoder,
|
||||
}
|
||||
if err := table.Init(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
objs := []*MockData{
|
||||
&MockData{
|
||||
Key: "1",
|
||||
First: "Kevin",
|
||||
Last: "Smith",
|
||||
Country: "USA",
|
||||
},
|
||||
&MockData{
|
||||
Key: "2",
|
||||
First: "Kevin",
|
||||
Last: "Wang",
|
||||
Country: "USA",
|
||||
},
|
||||
&MockData{
|
||||
Key: "3",
|
||||
First: "Bernardo",
|
||||
Last: "Torres",
|
||||
Country: "Mexico",
|
||||
},
|
||||
}
|
||||
|
||||
// Insert some mock objects
|
||||
for idx, obj := range objs {
|
||||
if err := table.Insert(obj); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := table.SetLastIndex(uint64(idx + 1)); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start a readonly txn
|
||||
tx, err := table.StartTxn(true, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Stream the records
|
||||
streamCh := make(chan interface{})
|
||||
go func() {
|
||||
if err := table.StreamTxn(streamCh, tx, "id"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Verify we get them all
|
||||
idx := 0
|
||||
for obj := range streamCh {
|
||||
p := obj.(*MockData)
|
||||
if !reflect.DeepEqual(p, objs[idx]) {
|
||||
t.Fatalf("bad: %#v %#v", p, objs[idx])
|
||||
}
|
||||
idx++
|
||||
}
|
||||
|
||||
if idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
}
|
||||
|
@ -18,3 +18,8 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error {
|
||||
future := r.server.raft.RemovePeer(peer)
|
||||
return future.Error()
|
||||
}
|
||||
|
||||
func (r *Raft) Snapshot(args struct{}, reply *struct{}) error {
|
||||
future := r.server.raft.Snapshot()
|
||||
return future.Error()
|
||||
}
|
||||
|
@ -101,6 +101,7 @@ type endpoints struct {
|
||||
Health *Health
|
||||
Raft *Raft
|
||||
Status *Status
|
||||
KVS *KVS
|
||||
}
|
||||
|
||||
// NewServer is used to construct a new Consul server from the
|
||||
@ -276,12 +277,14 @@ func (s *Server) setupRPC() error {
|
||||
s.endpoints.Raft = &Raft{s}
|
||||
s.endpoints.Catalog = &Catalog{s}
|
||||
s.endpoints.Health = &Health{s}
|
||||
s.endpoints.KVS = &KVS{s}
|
||||
|
||||
// Register the handlers
|
||||
s.rpcServer.Register(s.endpoints.Status)
|
||||
s.rpcServer.Register(s.endpoints.Raft)
|
||||
s.rpcServer.Register(s.endpoints.Catalog)
|
||||
s.rpcServer.Register(s.endpoints.Health)
|
||||
s.rpcServer.Register(s.endpoints.KVS)
|
||||
|
||||
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
||||
if err != nil {
|
||||
|
@ -14,7 +14,8 @@ const (
|
||||
dbNodes = "nodes"
|
||||
dbServices = "services"
|
||||
dbChecks = "checks"
|
||||
dbMaxMapSize = 128 * 1024 * 1024 // 128MB maximum size
|
||||
dbKVS = "kvs"
|
||||
dbMaxMapSize = 512 * 1024 * 1024 // 512MB maximum size
|
||||
)
|
||||
|
||||
// The StateStore is responsible for maintaining all the Consul
|
||||
@ -31,6 +32,7 @@ type StateStore struct {
|
||||
nodeTable *MDBTable
|
||||
serviceTable *MDBTable
|
||||
checkTable *MDBTable
|
||||
kvsTable *MDBTable
|
||||
tables MDBTables
|
||||
watch map[*MDBTable]*NotifyGroup
|
||||
queryTables map[string]MDBTables
|
||||
@ -183,8 +185,31 @@ func (s *StateStore) initialize() error {
|
||||
},
|
||||
}
|
||||
|
||||
s.kvsTable = &MDBTable{
|
||||
Name: dbKVS,
|
||||
Indexes: map[string]*MDBIndex{
|
||||
"id": &MDBIndex{
|
||||
Unique: true,
|
||||
Fields: []string{"Key"},
|
||||
},
|
||||
"id_prefix": &MDBIndex{
|
||||
Virtual: true,
|
||||
RealIndex: "id",
|
||||
Fields: []string{"Key"},
|
||||
IdxFunc: DefaultIndexPrefixFunc,
|
||||
},
|
||||
},
|
||||
Decoder: func(buf []byte) interface{} {
|
||||
out := new(structs.DirEntry)
|
||||
if err := structs.Decode(buf, out); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return out
|
||||
},
|
||||
}
|
||||
|
||||
// Store the set of tables
|
||||
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable}
|
||||
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable}
|
||||
for _, table := range s.tables {
|
||||
table.Env = s.env
|
||||
table.Encoder = encoder
|
||||
@ -206,6 +231,8 @@ func (s *StateStore) initialize() error {
|
||||
"NodeChecks": MDBTables{s.checkTable},
|
||||
"ServiceChecks": MDBTables{s.checkTable},
|
||||
"CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
|
||||
"KVSGet": MDBTables{s.kvsTable},
|
||||
"KVSList": MDBTables{s.kvsTable},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -686,6 +713,159 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e
|
||||
return nodes
|
||||
}
|
||||
|
||||
// KVSSet is used to create or update a KV entry
|
||||
func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the existing node
|
||||
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the create and modify times
|
||||
if len(res) == 0 {
|
||||
d.CreateIndex = index
|
||||
} else {
|
||||
d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex
|
||||
}
|
||||
d.ModifyIndex = index
|
||||
|
||||
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.watch[s.kvsTable].Notify()
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// KVSRestore is used to restore a DirEntry. It should only be used when
|
||||
// doing a restore, otherwise KVSSet should be used.
|
||||
func (s *StateStore) KVSRestore(d *structs.DirEntry) error {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// KVSGet is used to get a KV entry
|
||||
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
|
||||
idx, res, err := s.kvsTable.Get("id", key)
|
||||
var d *structs.DirEntry
|
||||
if len(res) > 0 {
|
||||
d = res[0].(*structs.DirEntry)
|
||||
}
|
||||
return idx, d, err
|
||||
}
|
||||
|
||||
// KVSList is used to list all KV entries with a prefix
|
||||
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
|
||||
idx, res, err := s.kvsTable.Get("id_prefix", prefix)
|
||||
ents := make(structs.DirEntries, len(res))
|
||||
for idx, r := range res {
|
||||
ents[idx] = r.(*structs.DirEntry)
|
||||
}
|
||||
return idx, ents, err
|
||||
}
|
||||
|
||||
// KVSDelete is used to delete a KVS entry
|
||||
func (s *StateStore) KVSDelete(index uint64, key string) error {
|
||||
return s.kvsDeleteWithIndex(index, "id", key)
|
||||
}
|
||||
|
||||
// KVSDeleteTree is used to delete all keys with a given prefix
|
||||
func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
|
||||
if prefix == "" {
|
||||
return s.kvsDeleteWithIndex(index, "id")
|
||||
}
|
||||
return s.kvsDeleteWithIndex(index, "id_prefix", prefix)
|
||||
}
|
||||
|
||||
// kvsDeleteWithIndex does a delete with either the id or id_prefix
|
||||
func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if num > 0 {
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.watch[s.kvsTable].Notify()
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// KVSCheckAndSet is used to perform an atomic check-and-set
|
||||
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the existing node
|
||||
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Get the existing node if any
|
||||
var exist *structs.DirEntry
|
||||
if len(res) > 0 {
|
||||
exist = res[0].(*structs.DirEntry)
|
||||
}
|
||||
|
||||
// Use the ModifyIndex as the constraint. A modify of time of 0
|
||||
// means we are doing a set-if-not-exists, while any other value
|
||||
// means we expect that modify time.
|
||||
if d.ModifyIndex == 0 && exist != nil {
|
||||
return false, nil
|
||||
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Set the create and modify times
|
||||
if exist == nil {
|
||||
d.CreateIndex = index
|
||||
} else {
|
||||
d.CreateIndex = exist.CreateIndex
|
||||
}
|
||||
d.ModifyIndex = index
|
||||
|
||||
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer s.watch[s.kvsTable].Notify()
|
||||
return true, tx.Commit()
|
||||
}
|
||||
|
||||
// Snapshot is used to create a point in time snapshot
|
||||
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
||||
// Begin a new txn on all tables
|
||||
@ -742,3 +922,10 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks {
|
||||
_, checks := s.store.parseHealthChecks(s.lastIndex, res, err)
|
||||
return checks
|
||||
}
|
||||
|
||||
// KVSDump is used to list all KV entries. It takes a channel and streams
|
||||
// back *struct.DirEntry objects. This will block and should be invoked
|
||||
// in a goroutine.
|
||||
func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
|
||||
return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
|
||||
}
|
||||
|
@ -550,6 +550,16 @@ func TestStoreSnapshot(t *testing.T) {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
// Add some KVS entries
|
||||
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(14, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(15, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Take a snapshot
|
||||
snap, err := store.Snapshot()
|
||||
if err != nil {
|
||||
@ -558,7 +568,7 @@ func TestStoreSnapshot(t *testing.T) {
|
||||
defer snap.Close()
|
||||
|
||||
// Check the last nodes
|
||||
if idx := snap.LastIndex(); idx != 13 {
|
||||
if idx := snap.LastIndex(); idx != 15 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
|
||||
@ -591,6 +601,28 @@ func TestStoreSnapshot(t *testing.T) {
|
||||
t.Fatalf("bad: %v", checks[0])
|
||||
}
|
||||
|
||||
// Check we have the entries
|
||||
streamCh := make(chan interface{}, 64)
|
||||
doneCh := make(chan struct{})
|
||||
var ents []*structs.DirEntry
|
||||
go func() {
|
||||
for {
|
||||
obj := <-streamCh
|
||||
if obj == nil {
|
||||
close(doneCh)
|
||||
return
|
||||
}
|
||||
ents = append(ents, obj.(*structs.DirEntry))
|
||||
}
|
||||
}()
|
||||
if err := snap.KVSDump(streamCh); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
<-doneCh
|
||||
if len(ents) != 2 {
|
||||
t.Fatalf("missing KVS entries!")
|
||||
}
|
||||
|
||||
// Make some changes!
|
||||
if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
@ -612,6 +644,10 @@ func TestStoreSnapshot(t *testing.T) {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.KVSDelete(18, "/web/a"); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
// Check snapshot has old values
|
||||
nodes = snap.Nodes()
|
||||
if len(nodes) != 2 {
|
||||
@ -639,6 +675,28 @@ func TestStoreSnapshot(t *testing.T) {
|
||||
if !reflect.DeepEqual(checks[0], check) {
|
||||
t.Fatalf("bad: %v", checks[0])
|
||||
}
|
||||
|
||||
// Check we have the entries
|
||||
streamCh = make(chan interface{}, 64)
|
||||
doneCh = make(chan struct{})
|
||||
ents = nil
|
||||
go func() {
|
||||
for {
|
||||
obj := <-streamCh
|
||||
if obj == nil {
|
||||
close(doneCh)
|
||||
return
|
||||
}
|
||||
ents = append(ents, obj.(*structs.DirEntry))
|
||||
}
|
||||
}()
|
||||
if err := snap.KVSDump(streamCh); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
<-doneCh
|
||||
if len(ents) != 2 {
|
||||
t.Fatalf("missing KVS entries!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureCheck(t *testing.T) {
|
||||
@ -933,3 +991,279 @@ func TestSS_Register_Deregister_Query(t *testing.T) {
|
||||
t.Fatalf("Bad: %v", nodes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSSet_Get(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Should not exist
|
||||
idx, d, err := store.KVSGet("/foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 0 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if d != nil {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
|
||||
// Create the entry
|
||||
d = &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1000, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should exist exist
|
||||
idx, d, err = store.KVSGet("/foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1000 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if d.CreateIndex != 1000 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.ModifyIndex != 1000 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.Key != "/foo" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.Flags != 42 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if string(d.Value) != "test" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
|
||||
// Update the entry
|
||||
d = &structs.DirEntry{Key: "/foo", Flags: 43, Value: []byte("zip")}
|
||||
if err := store.KVSSet(1010, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should update
|
||||
idx, d, err = store.KVSGet("/foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1010 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if d.CreateIndex != 1000 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.ModifyIndex != 1010 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.Key != "/foo" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if d.Flags != 43 {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
if string(d.Value) != "zip" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSDelete(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Create the entry
|
||||
d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1000, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Delete the entry
|
||||
if err := store.KVSDelete(1020, "/foo"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should not exist
|
||||
idx, d, err := store.KVSGet("/foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1020 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if d != nil {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSCheckAndSet(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// CAS should fail, no entry
|
||||
d := &structs.DirEntry{
|
||||
ModifyIndex: 100,
|
||||
Key: "/foo",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
}
|
||||
ok, err := store.KVSCheckAndSet(1000, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// Constrain on not-exist, should work
|
||||
d.ModifyIndex = 0
|
||||
ok, err = store.KVSCheckAndSet(1001, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("expected commit")
|
||||
}
|
||||
|
||||
// Constrain on not-exist, should fail
|
||||
d.ModifyIndex = 0
|
||||
ok, err = store.KVSCheckAndSet(1002, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// Constrain on a wrong modify time
|
||||
d.ModifyIndex = 1000
|
||||
ok, err = store.KVSCheckAndSet(1003, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatalf("unexpected commit")
|
||||
}
|
||||
|
||||
// Constrain on a correct modify time
|
||||
d.ModifyIndex = 1001
|
||||
ok, err = store.KVSCheckAndSet(1004, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("expected commit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVS_List(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Should not exist
|
||||
idx, ents, err := store.KVSList("/web")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 0 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if len(ents) != 0 {
|
||||
t.Fatalf("bad: %v", ents)
|
||||
}
|
||||
|
||||
// Create the entries
|
||||
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1000, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1001, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1002, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should list
|
||||
idx, ents, err = store.KVSList("/web")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1002 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if len(ents) != 3 {
|
||||
t.Fatalf("bad: %v", ents)
|
||||
}
|
||||
|
||||
if ents[0].Key != "/web/a" {
|
||||
t.Fatalf("bad: %v", ents[0])
|
||||
}
|
||||
if ents[1].Key != "/web/b" {
|
||||
t.Fatalf("bad: %v", ents[1])
|
||||
}
|
||||
if ents[2].Key != "/web/sub/c" {
|
||||
t.Fatalf("bad: %v", ents[2])
|
||||
}
|
||||
}
|
||||
|
||||
func TestKVSDeleteTree(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
// Should not exist
|
||||
err = store.KVSDeleteTree(1000, "/web")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create the entries
|
||||
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1000, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1001, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")}
|
||||
if err := store.KVSSet(1002, d); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Nuke the web tree
|
||||
err = store.KVSDeleteTree(1010, "/web")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Nothing should list
|
||||
idx, ents, err := store.KVSList("/web")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if idx != 1010 {
|
||||
t.Fatalf("bad: %v", idx)
|
||||
}
|
||||
if len(ents) != 0 {
|
||||
t.Fatalf("bad: %v", ents)
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ type MessageType uint8
|
||||
const (
|
||||
RegisterRequestType MessageType = iota
|
||||
DeregisterRequestType
|
||||
KVSRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
@ -172,6 +173,45 @@ type IndexedCheckServiceNodes struct {
|
||||
Nodes CheckServiceNodes
|
||||
}
|
||||
|
||||
// DirEntry is used to represent a directory entry. This is
|
||||
// used for values in our Key-Value store.
|
||||
type DirEntry struct {
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
Key string
|
||||
Flags uint64
|
||||
Value []byte
|
||||
}
|
||||
type DirEntries []*DirEntry
|
||||
|
||||
type KVSOp string
|
||||
|
||||
const (
|
||||
KVSSet KVSOp = "set"
|
||||
KVSDelete = "delete"
|
||||
KVSDeleteTree = "delete-tree"
|
||||
KVSCAS = "cas" // Check-and-set
|
||||
)
|
||||
|
||||
// KVSRequest is used to operate on the Key-Value store
|
||||
type KVSRequest struct {
|
||||
Datacenter string
|
||||
Op KVSOp // Which operation are we performing
|
||||
DirEnt DirEntry // Which directory entry
|
||||
}
|
||||
|
||||
// KeyRequest is used to request a key, or key prefix
|
||||
type KeyRequest struct {
|
||||
Datacenter string
|
||||
Key string
|
||||
BlockingQuery
|
||||
}
|
||||
|
||||
type IndexedDirEntries struct {
|
||||
Index uint64
|
||||
Entries DirEntries
|
||||
}
|
||||
|
||||
// Decode is used to decode a MsgPack encoded object
|
||||
func Decode(buf []byte, out interface{}) error {
|
||||
var handle codec.MsgpackHandle
|
||||
|
@ -7,11 +7,12 @@ sidebar_current: "docs-agent-http"
|
||||
# HTTP API
|
||||
|
||||
The main interface to Consul is a RESTful HTTP API. The API can be
|
||||
used for CRUD for nodes, services, and checks. The endpoints are
|
||||
used for CRUD for nodes, services, checks, and configuration. The endpoints are
|
||||
versioned to enable changes without breaking backwards compatibility.
|
||||
|
||||
All endpoints fall into one of 4 categories:
|
||||
All endpoints fall into one of 5 categories:
|
||||
|
||||
* kv - Key/Value store
|
||||
* agent - Agent control
|
||||
* catalog - Manages nodes and services
|
||||
* health - Manages health checks
|
||||
@ -28,7 +29,7 @@ Queries that support this will mention it specifically, however the use of this
|
||||
feature is the same for all. If supported, the query will set an HTTP header
|
||||
"X-Consul-Index". This is an opaque handle that the client will use.
|
||||
|
||||
To cause a query to block, the query parameters "?wait=<interval>&index=<idx>" are added
|
||||
To cause a query to block, the query parameters "?wait=\<interval\>&index=\<idx\>" are added
|
||||
to a request. The "?wait=" query parameter limits how long the query will potentially
|
||||
block for. It not set, it will default to 10 minutes. It can be specified in the form of
|
||||
"10s" or "5m", which is 10 seconds or 5 minutes respectively. The "?index=" parameter is an
|
||||
@ -41,6 +42,72 @@ note is that when the query returns there is **no guarantee** of a change. It is
|
||||
possible that the timeout was reached, or that there was an idempotent write that
|
||||
does not affect the result.
|
||||
|
||||
|
||||
## KV
|
||||
|
||||
The KV endpoint is used to expose a simple key/value store. This can be used
|
||||
to store service configurations or other meta data in a simple way. It has only
|
||||
a single endpoint:
|
||||
|
||||
/v1/kv/<key>
|
||||
|
||||
This is the only endpoint that is used with the Key/Value store.
|
||||
It's use depends on the HTTP method. The `GET`, `PUT` and `DELETE` methods
|
||||
are all supported.
|
||||
|
||||
When using the `GET` method, Consul will return the specified key,
|
||||
or if the "?recurse" query parameter is provided, it will return
|
||||
all keys with the given prefix.
|
||||
|
||||
Each object will look like:
|
||||
|
||||
[
|
||||
{
|
||||
"CreateIndex":100,
|
||||
"ModifyIndex":200,
|
||||
"Key":"zip",
|
||||
"Flags":0,
|
||||
"Value":"dGVzdA=="
|
||||
}
|
||||
]
|
||||
|
||||
The `CreateIndex` is the internal index value that represents
|
||||
when the entry was created. The `ModifyIndex` is the last index
|
||||
that modified this key. This index corresponds to the `X-Consul-Index`
|
||||
header value that is returned. A blocking query can be used to wait for
|
||||
a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds
|
||||
to the latest `ModifyIndex` and so a blocking query waits until any of the
|
||||
listed keys are updated.
|
||||
|
||||
The `Key` is simply the full path of the entry. `Flags` are an opaque
|
||||
unsigned integer that can be attached to each entry. The use of this is
|
||||
left totally to the user. Lastly, the `Value` is a base64 key value.
|
||||
|
||||
If no entries are found, a 404 code is returned.
|
||||
|
||||
When using the `PUT` method, Consul expects the request body to be the
|
||||
value corresponding to the key. There are a number of parameters that can
|
||||
be used with a PUT request:
|
||||
|
||||
* ?flags=\<num\> : This can be used to specify an unsigned value between
|
||||
0 and 2^64-1. It is opaque to the user, but a client application may
|
||||
use it.
|
||||
|
||||
* ?cas=\<index\> : This flag is used to turn the `PUT` into a **Check-And-Set**
|
||||
operation. This is very useful as it allows clients to build more complex
|
||||
syncronization primitives on top. If the index is 0, then Consul will only
|
||||
put the key if it does not already exist. If the index is non-zero, then
|
||||
the key is only set if the index matches the `ModifyIndex` of that key.
|
||||
|
||||
The return value is simply either `true` or `false`. If the CAS check fails,
|
||||
then `false` will be returned.
|
||||
|
||||
Lastly, the `DELETE` method can be used to delete a single key or all
|
||||
keys sharing a prefix. If the "?recurse" query parameter is provided,
|
||||
then all keys with the prefix are deleted, otherwise only the specified
|
||||
key.
|
||||
|
||||
|
||||
## Agent
|
||||
|
||||
The Agent endpoints are used to interact with a local Consul agent. Usually,
|
||||
|
@ -35,30 +35,34 @@ Here is an example output:
|
||||
num_peers = 2
|
||||
state = Leader
|
||||
term = 4
|
||||
serf-lan:
|
||||
event-queue = 0
|
||||
event-time = 2
|
||||
serf_lan:
|
||||
event_queue = 0
|
||||
event_time = 2
|
||||
failed = 0
|
||||
intent-queue = 0
|
||||
intent_queue = 0
|
||||
left = 0
|
||||
member-time = 7
|
||||
member_time = 7
|
||||
members = 3
|
||||
serf-wan:
|
||||
event-queue = 0
|
||||
event-time = 1
|
||||
query_queue = 0
|
||||
query_time = 1
|
||||
serf_wan:
|
||||
event_queue = 0
|
||||
event_time = 1
|
||||
failed = 0
|
||||
intent-queue = 0
|
||||
intent_queue = 0
|
||||
left = 0
|
||||
member-time = 1
|
||||
member_time = 1
|
||||
members = 1
|
||||
query_queue = 0
|
||||
query_time = 1
|
||||
|
||||
There are currently the top-level keys for:
|
||||
|
||||
* agent: Provides information about the agent
|
||||
* consul: Information about the consul library (client or server)
|
||||
* raft: Provides info about the Raft [consensus library](/docs/internals/consensus.html)
|
||||
* serf-lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html)
|
||||
* serf-wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html)
|
||||
* serf_lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html)
|
||||
* serf_wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html)
|
||||
|
||||
## Usage
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user