mirror of
https://github.com/status-im/consul.git
synced 2025-01-20 18:50:04 +00:00
Merge pull request #17 from hashicorp/f-kv
Adding simple Key/Value Store
This commit is contained in:
commit
21508292e4
@ -88,6 +88,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
|||||||
s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
|
s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
|
||||||
s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService))
|
s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService))
|
||||||
|
|
||||||
|
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
|
||||||
|
|
||||||
if enableDebug {
|
if enableDebug {
|
||||||
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||||
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
||||||
|
@ -8,6 +8,7 @@ register new services.
|
|||||||
The URLs are also versioned to allow for changes in the API.
|
The URLs are also versioned to allow for changes in the API.
|
||||||
The current URLs supported are:
|
The current URLs supported are:
|
||||||
|
|
||||||
|
Catalog:
|
||||||
* /v1/catalog/register : Registers a new service
|
* /v1/catalog/register : Registers a new service
|
||||||
* /v1/catalog/deregister : Deregisters a service or node
|
* /v1/catalog/deregister : Deregisters a service or node
|
||||||
* /v1/catalog/datacenters : Lists known datacenters
|
* /v1/catalog/datacenters : Lists known datacenters
|
||||||
@ -16,15 +17,17 @@ The current URLs supported are:
|
|||||||
* /v1/catalog/service/<service>/ : Lists the nodes in a given service
|
* /v1/catalog/service/<service>/ : Lists the nodes in a given service
|
||||||
* /v1/catalog/node/<node>/ : Lists the services provided by a node
|
* /v1/catalog/node/<node>/ : Lists the services provided by a node
|
||||||
|
|
||||||
* Health system:
|
Health system:
|
||||||
* /v1/health/node/<node>: Returns the health info of a node
|
* /v1/health/node/<node>: Returns the health info of a node
|
||||||
* /v1/health/checks/<service>: Returns the checks of a service
|
* /v1/health/checks/<service>: Returns the checks of a service
|
||||||
* /v1/health/service/<service>: Returns the nodes and health info of a service
|
* /v1/health/service/<service>: Returns the nodes and health info of a service
|
||||||
* /v1/health/state/<state>: Returns the checks in a given state
|
* /v1/health/state/<state>: Returns the checks in a given state
|
||||||
|
|
||||||
|
Status:
|
||||||
* /v1/status/leader : Returns the current Raft leader
|
* /v1/status/leader : Returns the current Raft leader
|
||||||
* /v1/status/peers : Returns the current Raft peer set
|
* /v1/status/peers : Returns the current Raft peer set
|
||||||
|
|
||||||
|
Agent:
|
||||||
* /v1/agent/checks: Returns the checks the local agent is managing
|
* /v1/agent/checks: Returns the checks the local agent is managing
|
||||||
* /v1/agent/services : Returns the services local agent is managing
|
* /v1/agent/services : Returns the services local agent is managing
|
||||||
* /v1/agent/members : Returns the members as seen by the local serf agent
|
* /v1/agent/members : Returns the members as seen by the local serf agent
|
||||||
@ -37,3 +40,7 @@ The current URLs supported are:
|
|||||||
* /v1/agent/check/fail/<name>
|
* /v1/agent/check/fail/<name>
|
||||||
* /v1/agent/service/register
|
* /v1/agent/service/register
|
||||||
* /v1/agent/service/deregister/<name>
|
* /v1/agent/service/deregister/<name>
|
||||||
|
|
||||||
|
KVS:
|
||||||
|
* /v1/kv/<key>
|
||||||
|
|
||||||
|
153
command/agent/kvs_endpoint.go
Normal file
153
command/agent/kvs_endpoint.go
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
|
// Set default DC
|
||||||
|
args := structs.KeyRequest{}
|
||||||
|
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull out the key name, validation left to each sub-handler
|
||||||
|
args.Key = strings.TrimPrefix(req.URL.Path, "/v1/kv/")
|
||||||
|
|
||||||
|
// Switch on the method
|
||||||
|
switch req.Method {
|
||||||
|
case "GET":
|
||||||
|
return s.KVSGet(resp, req, &args)
|
||||||
|
case "PUT":
|
||||||
|
return s.KVSPut(resp, req, &args)
|
||||||
|
case "DELETE":
|
||||||
|
return s.KVSDelete(resp, req, &args)
|
||||||
|
default:
|
||||||
|
resp.WriteHeader(405)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSGet handles a GET request
|
||||||
|
func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
|
||||||
|
// Check for recurse
|
||||||
|
method := "KVS.Get"
|
||||||
|
params := req.URL.Query()
|
||||||
|
if _, ok := params["recurse"]; ok {
|
||||||
|
method = "KVS.List"
|
||||||
|
} else if missingKey(resp, args) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the RPC
|
||||||
|
var out structs.IndexedDirEntries
|
||||||
|
if err := s.agent.RPC(method, &args, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
setIndex(resp, out.Index)
|
||||||
|
|
||||||
|
// Check if we get a not found
|
||||||
|
if len(out.Entries) == 0 {
|
||||||
|
resp.WriteHeader(404)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return out.Entries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSPut handles a PUT request
|
||||||
|
func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
|
||||||
|
if missingKey(resp, args) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
applyReq := structs.KVSRequest{
|
||||||
|
Datacenter: args.Datacenter,
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: args.Key,
|
||||||
|
Flags: 0,
|
||||||
|
Value: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for flags
|
||||||
|
params := req.URL.Query()
|
||||||
|
if _, ok := params["flags"]; ok {
|
||||||
|
flagVal, err := strconv.ParseUint(params.Get("flags"), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
applyReq.DirEnt.Flags = flagVal
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for cas value
|
||||||
|
if _, ok := params["cas"]; ok {
|
||||||
|
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
applyReq.DirEnt.ModifyIndex = casVal
|
||||||
|
applyReq.Op = structs.KVSCAS
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the value
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
if _, err := io.Copy(buf, req.Body); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
applyReq.DirEnt.Value = buf.Bytes()
|
||||||
|
|
||||||
|
// Make the RPC
|
||||||
|
var out bool
|
||||||
|
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only use the out value if this was a CAS
|
||||||
|
if applyReq.Op == structs.KVSSet {
|
||||||
|
return true, nil
|
||||||
|
} else {
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSPut handles a DELETE request
|
||||||
|
func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
|
||||||
|
applyReq := structs.KVSRequest{
|
||||||
|
Datacenter: args.Datacenter,
|
||||||
|
Op: structs.KVSDelete,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: args.Key,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for recurse
|
||||||
|
params := req.URL.Query()
|
||||||
|
if _, ok := params["recurse"]; ok {
|
||||||
|
applyReq.Op = structs.KVSDeleteTree
|
||||||
|
} else if missingKey(resp, args) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the RPC
|
||||||
|
var out bool
|
||||||
|
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// missingKey checks if the key is missing
|
||||||
|
func missingKey(resp http.ResponseWriter, args *structs.KeyRequest) bool {
|
||||||
|
if args.Key == "" {
|
||||||
|
resp.WriteHeader(400)
|
||||||
|
resp.Write([]byte("Missing key name"))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
291
command/agent/kvs_endpoint_test.go
Normal file
291
command/agent/kvs_endpoint_test.go
Normal file
@ -0,0 +1,291 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
|
||||||
|
dir, srv := makeHTTPServer(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer srv.Shutdown()
|
||||||
|
defer srv.agent.Shutdown()
|
||||||
|
|
||||||
|
// Wait for a leader
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
keys := []string{
|
||||||
|
"baz",
|
||||||
|
"bar",
|
||||||
|
"foo/sub1",
|
||||||
|
"foo/sub2",
|
||||||
|
"zip",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
buf := bytes.NewBuffer([]byte("test"))
|
||||||
|
req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if res := obj.(bool); !res {
|
||||||
|
t.Fatalf("should work")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
req, err := http.NewRequest("GET", "/v1/kv/"+key, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
header := resp.Header().Get("X-Consul-Index")
|
||||||
|
if header == "" {
|
||||||
|
t.Fatalf("Bad: %v", header)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, ok := obj.(structs.DirEntries)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("should work")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(res) != 1 {
|
||||||
|
t.Fatalf("bad: %v", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
if res[0].Key != key {
|
||||||
|
t.Fatalf("bad: %v", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
req, err := http.NewRequest("DELETE", "/v1/kv/"+key, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
_, err = srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVSEndpoint_Recurse(t *testing.T) {
|
||||||
|
dir, srv := makeHTTPServer(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer srv.Shutdown()
|
||||||
|
defer srv.agent.Shutdown()
|
||||||
|
|
||||||
|
// Wait for a leader
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
keys := []string{
|
||||||
|
"bar",
|
||||||
|
"baz",
|
||||||
|
"foo/sub1",
|
||||||
|
"foo/sub2",
|
||||||
|
"zip",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
buf := bytes.NewBuffer([]byte("test"))
|
||||||
|
req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if res := obj.(bool); !res {
|
||||||
|
t.Fatalf("should work")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Get all the keys
|
||||||
|
req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
header := resp.Header().Get("X-Consul-Index")
|
||||||
|
if header == "" {
|
||||||
|
t.Fatalf("Bad: %v", header)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, ok := obj.(structs.DirEntries)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("should work")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(res) != len(keys) {
|
||||||
|
t.Fatalf("bad: %v", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx, key := range keys {
|
||||||
|
if res[idx].Key != key {
|
||||||
|
t.Fatalf("bad: %v %v", res[idx].Key, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
req, err := http.NewRequest("DELETE", "/v1/kv/?recurse", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
_, err = srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Get all the keys
|
||||||
|
req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj != nil {
|
||||||
|
t.Fatalf("bad: %v", obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVSEndpoint_CAS(t *testing.T) {
|
||||||
|
dir, srv := makeHTTPServer(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer srv.Shutdown()
|
||||||
|
defer srv.agent.Shutdown()
|
||||||
|
|
||||||
|
// Wait for a leader
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
{
|
||||||
|
buf := bytes.NewBuffer([]byte("test"))
|
||||||
|
req, err := http.NewRequest("PUT", "/v1/kv/test?flags=50", buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if res := obj.(bool); !res {
|
||||||
|
t.Fatalf("should work")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", "/v1/kv/test", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d := obj.(structs.DirEntries)[0]
|
||||||
|
|
||||||
|
// Check the flags
|
||||||
|
if d.Flags != 50 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a CAS request, bad index
|
||||||
|
{
|
||||||
|
buf := bytes.NewBuffer([]byte("zip"))
|
||||||
|
req, err := http.NewRequest("PUT",
|
||||||
|
fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex-1), buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if res := obj.(bool); res {
|
||||||
|
t.Fatalf("should NOT work")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a CAS request, good index
|
||||||
|
{
|
||||||
|
buf := bytes.NewBuffer([]byte("zip"))
|
||||||
|
req, err := http.NewRequest("PUT",
|
||||||
|
fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex), buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := srv.KVSEndpoint(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if res := obj.(bool); !res {
|
||||||
|
t.Fatalf("should work")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the update
|
||||||
|
req, _ = http.NewRequest("GET", "/v1/kv/test", nil)
|
||||||
|
resp = httptest.NewRecorder()
|
||||||
|
obj, _ = srv.KVSEndpoint(resp, req)
|
||||||
|
d = obj.(structs.DirEntries)[0]
|
||||||
|
|
||||||
|
if d.Flags != 42 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if string(d.Value) != "zip" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
}
|
@ -65,6 +65,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||||||
return c.decodeRegister(buf[1:], log.Index)
|
return c.decodeRegister(buf[1:], log.Index)
|
||||||
case structs.DeregisterRequestType:
|
case structs.DeregisterRequestType:
|
||||||
return c.applyDeregister(buf[1:], log.Index)
|
return c.applyDeregister(buf[1:], log.Index)
|
||||||
|
case structs.KVSRequestType:
|
||||||
|
return c.applyKVSOperation(buf[1:], log.Index)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||||
}
|
}
|
||||||
@ -131,6 +133,32 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
|
||||||
|
var req structs.KVSRequest
|
||||||
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
|
}
|
||||||
|
switch req.Op {
|
||||||
|
case structs.KVSSet:
|
||||||
|
return c.state.KVSSet(index, &req.DirEnt)
|
||||||
|
case structs.KVSDelete:
|
||||||
|
return c.state.KVSDelete(index, req.DirEnt.Key)
|
||||||
|
case structs.KVSDeleteTree:
|
||||||
|
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
|
||||||
|
case structs.KVSCAS:
|
||||||
|
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
return act
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
|
||||||
|
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||||
@ -152,6 +180,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
c.state.Close()
|
||||||
c.state = state
|
c.state = state
|
||||||
|
|
||||||
// Create a decoder
|
// Create a decoder
|
||||||
@ -184,6 +213,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||||||
}
|
}
|
||||||
c.applyRegister(&req, header.LastIndex)
|
c.applyRegister(&req, header.LastIndex)
|
||||||
|
|
||||||
|
case structs.KVSRequestType:
|
||||||
|
var req structs.DirEntry
|
||||||
|
if err := dec.Decode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := c.state.KVSRestore(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
||||||
}
|
}
|
||||||
@ -247,6 +285,38 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enable GC of the ndoes
|
||||||
|
nodes = nil
|
||||||
|
|
||||||
|
// Dump the KVS entries
|
||||||
|
streamCh := make(chan interface{}, 256)
|
||||||
|
errorCh := make(chan error)
|
||||||
|
go func() {
|
||||||
|
if err := s.state.KVSDump(streamCh); err != nil {
|
||||||
|
errorCh <- err
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
OUTER:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case raw := <-streamCh:
|
||||||
|
if raw == nil {
|
||||||
|
break OUTER
|
||||||
|
}
|
||||||
|
sink.Write([]byte{byte(structs.KVSRequestType)})
|
||||||
|
if err := encoder.Encode(raw); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
case err := <-errorCh:
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -322,6 +322,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||||||
Status: structs.HealthPassing,
|
Status: structs.HealthPassing,
|
||||||
ServiceID: "web",
|
ServiceID: "web",
|
||||||
})
|
})
|
||||||
|
fsm.state.KVSSet(8, &structs.DirEntry{
|
||||||
|
Key: "/test",
|
||||||
|
Value: []byte("foo"),
|
||||||
|
})
|
||||||
|
|
||||||
// Snapshot
|
// Snapshot
|
||||||
snap, err := fsm.Snapshot()
|
snap, err := fsm.Snapshot()
|
||||||
@ -370,4 +374,198 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||||||
if len(checks) != 1 {
|
if len(checks) != 1 {
|
||||||
t.Fatalf("Bad: %v", checks)
|
t.Fatalf("Bad: %v", checks)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify key is set
|
||||||
|
_, d, err := fsm.state.KVSGet("/test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if string(d.Value) != "foo" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFSM_KVSSet(t *testing.T) {
|
||||||
|
fsm, err := NewFSM(os.Stderr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer fsm.Close()
|
||||||
|
|
||||||
|
req := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "/test/path",
|
||||||
|
Flags: 0,
|
||||||
|
Value: []byte("test"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
resp := fsm.Apply(makeLog(buf))
|
||||||
|
if resp != nil {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify key is set
|
||||||
|
_, d, err := fsm.state.KVSGet("/test/path")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if d == nil {
|
||||||
|
t.Fatalf("missing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFSM_KVSDelete(t *testing.T) {
|
||||||
|
fsm, err := NewFSM(os.Stderr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer fsm.Close()
|
||||||
|
|
||||||
|
req := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "/test/path",
|
||||||
|
Flags: 0,
|
||||||
|
Value: []byte("test"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
resp := fsm.Apply(makeLog(buf))
|
||||||
|
if resp != nil {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the delete
|
||||||
|
req.Op = structs.KVSDelete
|
||||||
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
resp = fsm.Apply(makeLog(buf))
|
||||||
|
if resp != nil {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify key is not set
|
||||||
|
_, d, err := fsm.state.KVSGet("/test/path")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if d != nil {
|
||||||
|
t.Fatalf("key present")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFSM_KVSDeleteTree(t *testing.T) {
|
||||||
|
fsm, err := NewFSM(os.Stderr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer fsm.Close()
|
||||||
|
|
||||||
|
req := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "/test/path",
|
||||||
|
Flags: 0,
|
||||||
|
Value: []byte("test"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
resp := fsm.Apply(makeLog(buf))
|
||||||
|
if resp != nil {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the delete tree
|
||||||
|
req.Op = structs.KVSDeleteTree
|
||||||
|
req.DirEnt.Key = "/test"
|
||||||
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
resp = fsm.Apply(makeLog(buf))
|
||||||
|
if resp != nil {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify key is not set
|
||||||
|
_, d, err := fsm.state.KVSGet("/test/path")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if d != nil {
|
||||||
|
t.Fatalf("key present")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFSM_KVSCheckAndSet(t *testing.T) {
|
||||||
|
fsm, err := NewFSM(os.Stderr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer fsm.Close()
|
||||||
|
|
||||||
|
req := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "/test/path",
|
||||||
|
Flags: 0,
|
||||||
|
Value: []byte("test"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
resp := fsm.Apply(makeLog(buf))
|
||||||
|
if resp != nil {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify key is set
|
||||||
|
_, d, err := fsm.state.KVSGet("/test/path")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if d == nil {
|
||||||
|
t.Fatalf("key missing")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the check-and-set
|
||||||
|
req.Op = structs.KVSCAS
|
||||||
|
req.DirEnt.ModifyIndex = d.ModifyIndex
|
||||||
|
req.DirEnt.Value = []byte("zip")
|
||||||
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
resp = fsm.Apply(makeLog(buf))
|
||||||
|
if resp.(bool) != true {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify key is updated
|
||||||
|
_, d, err = fsm.state.KVSGet("/test/path")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if string(d.Value) != "zip" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
115
consul/kvs_endpoint.go
Normal file
115
consul/kvs_endpoint.go
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KVS endpoint is used to manipulate the Key-Value store
|
||||||
|
type KVS struct {
|
||||||
|
srv *Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply is used to apply a KVS request to the data store. This should
|
||||||
|
// only be used for operations that modify the data
|
||||||
|
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
||||||
|
if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
|
||||||
|
|
||||||
|
// Verify the args
|
||||||
|
if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree {
|
||||||
|
return fmt.Errorf("Must provide key")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the update
|
||||||
|
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
|
||||||
|
if err != nil {
|
||||||
|
k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if respErr, ok := resp.(error); ok {
|
||||||
|
return respErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the return type is a bool
|
||||||
|
if respBool, ok := resp.(bool); ok {
|
||||||
|
*reply = respBool
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get is used to lookup a single key
|
||||||
|
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
|
||||||
|
if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the local state
|
||||||
|
state := k.srv.fsm.State()
|
||||||
|
return k.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
state.QueryTables("KVSGet"),
|
||||||
|
func() (uint64, error) {
|
||||||
|
index, ent, err := state.KVSGet(args.Key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if ent == nil {
|
||||||
|
// Must provide non-zero index to prevent blocking
|
||||||
|
// Index 1 is impossible anyways (due to Raft internals)
|
||||||
|
if index == 0 {
|
||||||
|
reply.Index = 1
|
||||||
|
} else {
|
||||||
|
reply.Index = index
|
||||||
|
}
|
||||||
|
reply.Entries = nil
|
||||||
|
} else {
|
||||||
|
reply.Index = ent.ModifyIndex
|
||||||
|
reply.Entries = structs.DirEntries{ent}
|
||||||
|
}
|
||||||
|
return reply.Index, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// List is used to list all keys with a given prefix
|
||||||
|
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
|
||||||
|
if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the local state
|
||||||
|
state := k.srv.fsm.State()
|
||||||
|
return k.srv.blockingRPC(&args.BlockingQuery,
|
||||||
|
state.QueryTables("KVSList"),
|
||||||
|
func() (uint64, error) {
|
||||||
|
index, ent, err := state.KVSList(args.Key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if len(ent) == 0 {
|
||||||
|
// Must provide non-zero index to prevent blocking
|
||||||
|
// Index 1 is impossible anyways (due to Raft internals)
|
||||||
|
if index == 0 {
|
||||||
|
reply.Index = 1
|
||||||
|
} else {
|
||||||
|
reply.Index = index
|
||||||
|
}
|
||||||
|
reply.Entries = nil
|
||||||
|
} else {
|
||||||
|
// Determine the maximum affected index
|
||||||
|
var maxIndex uint64
|
||||||
|
for _, e := range ent {
|
||||||
|
if e.ModifyIndex > maxIndex {
|
||||||
|
maxIndex = e.ModifyIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Index = maxIndex
|
||||||
|
reply.Entries = ent
|
||||||
|
}
|
||||||
|
return reply.Index, nil
|
||||||
|
})
|
||||||
|
}
|
173
consul/kvs_endpoint_test.go
Normal file
173
consul/kvs_endpoint_test.go
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestKVS_Apply(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Wait for leader
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
arg := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "test",
|
||||||
|
Flags: 42,
|
||||||
|
Value: []byte("test"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out bool
|
||||||
|
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify
|
||||||
|
state := s1.fsm.State()
|
||||||
|
_, d, err := state.KVSGet("test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if d == nil {
|
||||||
|
t.Fatalf("should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do a check and set
|
||||||
|
arg.Op = structs.KVSCAS
|
||||||
|
arg.DirEnt.ModifyIndex = d.ModifyIndex
|
||||||
|
arg.DirEnt.Flags = 43
|
||||||
|
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check this was applied
|
||||||
|
if out != true {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify
|
||||||
|
_, d, err = state.KVSGet("test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if d.Flags != 43 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVS_Get(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Wait for leader
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
arg := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "test",
|
||||||
|
Flags: 42,
|
||||||
|
Value: []byte("test"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out bool
|
||||||
|
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
getR := structs.KeyRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Key: "test",
|
||||||
|
}
|
||||||
|
var dirent structs.IndexedDirEntries
|
||||||
|
if err := client.Call("KVS.Get", &getR, &dirent); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dirent.Index == 0 {
|
||||||
|
t.Fatalf("Bad: %v", dirent)
|
||||||
|
}
|
||||||
|
if len(dirent.Entries) != 1 {
|
||||||
|
t.Fatalf("Bad: %v", dirent)
|
||||||
|
}
|
||||||
|
d := dirent.Entries[0]
|
||||||
|
if d.Flags != 42 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if string(d.Value) != "test" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVSEndpoint_List(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Wait for leader
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
keys := []string{
|
||||||
|
"/test/key1",
|
||||||
|
"/test/key2",
|
||||||
|
"/test/sub/key3",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
arg := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSSet,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: key,
|
||||||
|
Flags: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out bool
|
||||||
|
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getR := structs.KeyRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Key: "/test",
|
||||||
|
}
|
||||||
|
var dirent structs.IndexedDirEntries
|
||||||
|
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dirent.Index == 0 {
|
||||||
|
t.Fatalf("Bad: %v", dirent)
|
||||||
|
}
|
||||||
|
if len(dirent.Entries) != 3 {
|
||||||
|
t.Fatalf("Bad: %v", dirent.Entries)
|
||||||
|
}
|
||||||
|
for i := 0; i < len(dirent.Entries); i++ {
|
||||||
|
d := dirent.Entries[i]
|
||||||
|
if d.Key != keys[i] {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.Flags != 1 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.Value != nil {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -49,10 +49,13 @@ type MDBIndex struct {
|
|||||||
Unique bool // Controls if values are unique
|
Unique bool // Controls if values are unique
|
||||||
Fields []string // Fields are used to build the index
|
Fields []string // Fields are used to build the index
|
||||||
IdxFunc IndexFunc // Can be used to provide custom indexing
|
IdxFunc IndexFunc // Can be used to provide custom indexing
|
||||||
|
Virtual bool // Virtual index does not exist, but can be used for queries
|
||||||
|
RealIndex string // Virtual indexes use a RealIndex for iteration
|
||||||
|
|
||||||
table *MDBTable
|
table *MDBTable
|
||||||
name string
|
name string
|
||||||
dbiName string
|
dbiName string
|
||||||
|
realIndex *MDBIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
// MDBTxn is used to wrap an underlying transaction
|
// MDBTxn is used to wrap an underlying transaction
|
||||||
@ -88,6 +91,17 @@ func DefaultIndexFunc(idx *MDBIndex, parts []string) string {
|
|||||||
return prefix
|
return prefix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DefaultIndexPrefixFunc can be used with DefaultIndexFunc to scan
|
||||||
|
// for index prefix values. This should only be used as part of a
|
||||||
|
// virtual index.
|
||||||
|
func DefaultIndexPrefixFunc(idx *MDBIndex, parts []string) string {
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return "_"
|
||||||
|
}
|
||||||
|
prefix := "_" + strings.Join(parts, "||")
|
||||||
|
return prefix
|
||||||
|
}
|
||||||
|
|
||||||
// Init is used to initialize the MDBTable and ensure it's ready
|
// Init is used to initialize the MDBTable and ensure it's ready
|
||||||
func (t *MDBTable) Init() error {
|
func (t *MDBTable) Init() error {
|
||||||
if t.Env == nil {
|
if t.Env == nil {
|
||||||
@ -111,6 +125,9 @@ func (t *MDBTable) Init() error {
|
|||||||
if id.AllowBlank {
|
if id.AllowBlank {
|
||||||
return fmt.Errorf("id index must not allow blanks")
|
return fmt.Errorf("id index must not allow blanks")
|
||||||
}
|
}
|
||||||
|
if id.Virtual {
|
||||||
|
return fmt.Errorf("id index cannot be virtual")
|
||||||
|
}
|
||||||
|
|
||||||
// Create the table
|
// Create the table
|
||||||
if err := t.createTable(); err != nil {
|
if err := t.createTable(); err != nil {
|
||||||
@ -221,6 +238,9 @@ EXTEND:
|
|||||||
mdbTxn.dbis[t.Name] = dbi
|
mdbTxn.dbis[t.Name] = dbi
|
||||||
|
|
||||||
for _, index := range t.Indexes {
|
for _, index := range t.Indexes {
|
||||||
|
if index.Virtual {
|
||||||
|
continue
|
||||||
|
}
|
||||||
dbi, err := index.openDBI(tx)
|
dbi, err := index.openDBI(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Abort()
|
tx.Abort()
|
||||||
@ -237,6 +257,9 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) {
|
|||||||
// Construct the indexes keys
|
// Construct the indexes keys
|
||||||
indexes := make(map[string][]byte)
|
indexes := make(map[string][]byte)
|
||||||
for name, index := range t.Indexes {
|
for name, index := range t.Indexes {
|
||||||
|
if index.Virtual {
|
||||||
|
continue
|
||||||
|
}
|
||||||
key, err := index.keyFromObject(obj)
|
key, err := index.keyFromObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -301,6 +324,9 @@ AFTER_DELETE:
|
|||||||
|
|
||||||
// Insert the new indexes
|
// Insert the new indexes
|
||||||
for name, index := range t.Indexes {
|
for name, index := range t.Indexes {
|
||||||
|
if index.Virtual {
|
||||||
|
continue
|
||||||
|
}
|
||||||
dbi := tx.dbis[index.dbiName]
|
dbi := tx.dbis[index.dbiName]
|
||||||
if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil {
|
if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -350,6 +376,29 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac
|
|||||||
return results, err
|
return results, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamTxn is like GetTxn but it streams the results over a channel.
|
||||||
|
// This can be used if the expected data set is very large. The stream
|
||||||
|
// is always closed on return.
|
||||||
|
func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error {
|
||||||
|
// Always close the stream on return
|
||||||
|
defer close(stream)
|
||||||
|
|
||||||
|
// Get the associated index
|
||||||
|
idx, key, err := t.getIndex(index, parts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream the results
|
||||||
|
err = idx.iterate(tx, key, func(encRowId, res []byte) bool {
|
||||||
|
obj := t.Decoder(res)
|
||||||
|
stream <- obj
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// getIndex is used to get the proper index, and also check the arity
|
// getIndex is used to get the proper index, and also check the arity
|
||||||
func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) {
|
func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) {
|
||||||
// Get the index
|
// Get the index
|
||||||
@ -427,6 +476,12 @@ func (t *MDBTable) deleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (num i
|
|||||||
if name == idx.name {
|
if name == idx.name {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if idx.Virtual && name == idx.RealIndex {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if otherIdx.Virtual {
|
||||||
|
continue
|
||||||
|
}
|
||||||
dbi := tx.dbis[otherIdx.dbiName]
|
dbi := tx.dbis[otherIdx.dbiName]
|
||||||
if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil {
|
if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -464,11 +519,23 @@ func (i *MDBIndex) init(table *MDBTable, name string) error {
|
|||||||
if err := i.createIndex(); err != nil {
|
if err := i.createIndex(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Verify real index exists
|
||||||
|
if i.Virtual {
|
||||||
|
if realIndex, ok := table.Indexes[i.RealIndex]; !ok {
|
||||||
|
return fmt.Errorf("real index '%s' missing", i.RealIndex)
|
||||||
|
} else {
|
||||||
|
i.realIndex = realIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// createIndex is used to ensure the index exists
|
// createIndex is used to ensure the index exists
|
||||||
func (i *MDBIndex) createIndex() error {
|
func (i *MDBIndex) createIndex() error {
|
||||||
|
// Do not create if this is a virtual index
|
||||||
|
if i.Virtual {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
tx, err := i.table.Env.BeginTxn(nil, 0)
|
tx, err := i.table.Env.BeginTxn(nil, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -529,7 +596,14 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte {
|
|||||||
func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
|
func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
|
||||||
cb func(encRowId, res []byte) bool) error {
|
cb func(encRowId, res []byte) bool) error {
|
||||||
table := tx.dbis[i.table.Name]
|
table := tx.dbis[i.table.Name]
|
||||||
dbi := tx.dbis[i.dbiName]
|
|
||||||
|
// If virtual, use the correct DBI
|
||||||
|
var dbi mdb.DBI
|
||||||
|
if i.Virtual {
|
||||||
|
dbi = tx.dbis[i.realIndex.dbiName]
|
||||||
|
} else {
|
||||||
|
dbi = tx.dbis[i.dbiName]
|
||||||
|
}
|
||||||
|
|
||||||
cursor, err := tx.tx.CursorOpen(dbi)
|
cursor, err := tx.tx.CursorOpen(dbi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -781,3 +781,194 @@ func TestMDBTableDelete_Prefix(t *testing.T) {
|
|||||||
t.Fatalf("expect 2 result: %#v", res)
|
t.Fatalf("expect 2 result: %#v", res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMDBTableVirtualIndex(t *testing.T) {
|
||||||
|
dir, env := testMDBEnv(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer env.Close()
|
||||||
|
|
||||||
|
table := &MDBTable{
|
||||||
|
Env: env,
|
||||||
|
Name: "test",
|
||||||
|
Indexes: map[string]*MDBIndex{
|
||||||
|
"id": &MDBIndex{
|
||||||
|
Unique: true,
|
||||||
|
Fields: []string{"First"},
|
||||||
|
},
|
||||||
|
"id_prefix": &MDBIndex{
|
||||||
|
Virtual: true,
|
||||||
|
RealIndex: "id",
|
||||||
|
Fields: []string{"First"},
|
||||||
|
IdxFunc: DefaultIndexPrefixFunc,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Encoder: MockEncoder,
|
||||||
|
Decoder: MockDecoder,
|
||||||
|
}
|
||||||
|
if err := table.Init(); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if table.lastRowID != 0 {
|
||||||
|
t.Fatalf("bad last row id: %d", table.lastRowID)
|
||||||
|
}
|
||||||
|
|
||||||
|
objs := []*MockData{
|
||||||
|
&MockData{
|
||||||
|
Key: "1",
|
||||||
|
First: "Jack",
|
||||||
|
Last: "Smith",
|
||||||
|
Country: "USA",
|
||||||
|
},
|
||||||
|
&MockData{
|
||||||
|
Key: "2",
|
||||||
|
First: "John",
|
||||||
|
Last: "Wang",
|
||||||
|
Country: "USA",
|
||||||
|
},
|
||||||
|
&MockData{
|
||||||
|
Key: "3",
|
||||||
|
First: "James",
|
||||||
|
Last: "Torres",
|
||||||
|
Country: "Mexico",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert some mock objects
|
||||||
|
for idx, obj := range objs {
|
||||||
|
if err := table.Insert(obj); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if err := table.SetLastIndex(uint64(4 * idx)); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if table.lastRowID != 3 {
|
||||||
|
t.Fatalf("bad last row id: %d", table.lastRowID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if idx, _ := table.LastIndex(); idx != 8 {
|
||||||
|
t.Fatalf("bad last idx: %d", idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, res, err := table.Get("id_prefix", "J")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(res) != 3 {
|
||||||
|
t.Fatalf("expect 3 result: %#v", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, res, err = table.Get("id_prefix", "Ja")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(res) != 2 {
|
||||||
|
t.Fatalf("expect 2 result: %#v", res)
|
||||||
|
}
|
||||||
|
|
||||||
|
num, err := table.Delete("id_prefix", "Ja")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if num != 2 {
|
||||||
|
t.Fatalf("expect 2 result: %#v", num)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, res, err = table.Get("id_prefix", "J")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(res) != 1 {
|
||||||
|
t.Fatalf("expect 1 result: %#v", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMDBTableStream(t *testing.T) {
|
||||||
|
dir, env := testMDBEnv(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer env.Close()
|
||||||
|
|
||||||
|
table := &MDBTable{
|
||||||
|
Env: env,
|
||||||
|
Name: "test",
|
||||||
|
Indexes: map[string]*MDBIndex{
|
||||||
|
"id": &MDBIndex{
|
||||||
|
Unique: true,
|
||||||
|
Fields: []string{"Key"},
|
||||||
|
},
|
||||||
|
"name": &MDBIndex{
|
||||||
|
Fields: []string{"First", "Last"},
|
||||||
|
},
|
||||||
|
"country": &MDBIndex{
|
||||||
|
Fields: []string{"Country"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Encoder: MockEncoder,
|
||||||
|
Decoder: MockDecoder,
|
||||||
|
}
|
||||||
|
if err := table.Init(); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objs := []*MockData{
|
||||||
|
&MockData{
|
||||||
|
Key: "1",
|
||||||
|
First: "Kevin",
|
||||||
|
Last: "Smith",
|
||||||
|
Country: "USA",
|
||||||
|
},
|
||||||
|
&MockData{
|
||||||
|
Key: "2",
|
||||||
|
First: "Kevin",
|
||||||
|
Last: "Wang",
|
||||||
|
Country: "USA",
|
||||||
|
},
|
||||||
|
&MockData{
|
||||||
|
Key: "3",
|
||||||
|
First: "Bernardo",
|
||||||
|
Last: "Torres",
|
||||||
|
Country: "Mexico",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert some mock objects
|
||||||
|
for idx, obj := range objs {
|
||||||
|
if err := table.Insert(obj); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if err := table.SetLastIndex(uint64(idx + 1)); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a readonly txn
|
||||||
|
tx, err := table.StartTxn(true, nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Stream the records
|
||||||
|
streamCh := make(chan interface{})
|
||||||
|
go func() {
|
||||||
|
if err := table.StreamTxn(streamCh, tx, "id"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Verify we get them all
|
||||||
|
idx := 0
|
||||||
|
for obj := range streamCh {
|
||||||
|
p := obj.(*MockData)
|
||||||
|
if !reflect.DeepEqual(p, objs[idx]) {
|
||||||
|
t.Fatalf("bad: %#v %#v", p, objs[idx])
|
||||||
|
}
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
|
||||||
|
if idx != 3 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -18,3 +18,8 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error {
|
|||||||
future := r.server.raft.RemovePeer(peer)
|
future := r.server.raft.RemovePeer(peer)
|
||||||
return future.Error()
|
return future.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Raft) Snapshot(args struct{}, reply *struct{}) error {
|
||||||
|
future := r.server.raft.Snapshot()
|
||||||
|
return future.Error()
|
||||||
|
}
|
||||||
|
@ -101,6 +101,7 @@ type endpoints struct {
|
|||||||
Health *Health
|
Health *Health
|
||||||
Raft *Raft
|
Raft *Raft
|
||||||
Status *Status
|
Status *Status
|
||||||
|
KVS *KVS
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer is used to construct a new Consul server from the
|
// NewServer is used to construct a new Consul server from the
|
||||||
@ -276,12 +277,14 @@ func (s *Server) setupRPC() error {
|
|||||||
s.endpoints.Raft = &Raft{s}
|
s.endpoints.Raft = &Raft{s}
|
||||||
s.endpoints.Catalog = &Catalog{s}
|
s.endpoints.Catalog = &Catalog{s}
|
||||||
s.endpoints.Health = &Health{s}
|
s.endpoints.Health = &Health{s}
|
||||||
|
s.endpoints.KVS = &KVS{s}
|
||||||
|
|
||||||
// Register the handlers
|
// Register the handlers
|
||||||
s.rpcServer.Register(s.endpoints.Status)
|
s.rpcServer.Register(s.endpoints.Status)
|
||||||
s.rpcServer.Register(s.endpoints.Raft)
|
s.rpcServer.Register(s.endpoints.Raft)
|
||||||
s.rpcServer.Register(s.endpoints.Catalog)
|
s.rpcServer.Register(s.endpoints.Catalog)
|
||||||
s.rpcServer.Register(s.endpoints.Health)
|
s.rpcServer.Register(s.endpoints.Health)
|
||||||
|
s.rpcServer.Register(s.endpoints.KVS)
|
||||||
|
|
||||||
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -14,7 +14,8 @@ const (
|
|||||||
dbNodes = "nodes"
|
dbNodes = "nodes"
|
||||||
dbServices = "services"
|
dbServices = "services"
|
||||||
dbChecks = "checks"
|
dbChecks = "checks"
|
||||||
dbMaxMapSize = 128 * 1024 * 1024 // 128MB maximum size
|
dbKVS = "kvs"
|
||||||
|
dbMaxMapSize = 512 * 1024 * 1024 // 512MB maximum size
|
||||||
)
|
)
|
||||||
|
|
||||||
// The StateStore is responsible for maintaining all the Consul
|
// The StateStore is responsible for maintaining all the Consul
|
||||||
@ -31,6 +32,7 @@ type StateStore struct {
|
|||||||
nodeTable *MDBTable
|
nodeTable *MDBTable
|
||||||
serviceTable *MDBTable
|
serviceTable *MDBTable
|
||||||
checkTable *MDBTable
|
checkTable *MDBTable
|
||||||
|
kvsTable *MDBTable
|
||||||
tables MDBTables
|
tables MDBTables
|
||||||
watch map[*MDBTable]*NotifyGroup
|
watch map[*MDBTable]*NotifyGroup
|
||||||
queryTables map[string]MDBTables
|
queryTables map[string]MDBTables
|
||||||
@ -183,8 +185,31 @@ func (s *StateStore) initialize() error {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.kvsTable = &MDBTable{
|
||||||
|
Name: dbKVS,
|
||||||
|
Indexes: map[string]*MDBIndex{
|
||||||
|
"id": &MDBIndex{
|
||||||
|
Unique: true,
|
||||||
|
Fields: []string{"Key"},
|
||||||
|
},
|
||||||
|
"id_prefix": &MDBIndex{
|
||||||
|
Virtual: true,
|
||||||
|
RealIndex: "id",
|
||||||
|
Fields: []string{"Key"},
|
||||||
|
IdxFunc: DefaultIndexPrefixFunc,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Decoder: func(buf []byte) interface{} {
|
||||||
|
out := new(structs.DirEntry)
|
||||||
|
if err := structs.Decode(buf, out); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Store the set of tables
|
// Store the set of tables
|
||||||
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable}
|
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable}
|
||||||
for _, table := range s.tables {
|
for _, table := range s.tables {
|
||||||
table.Env = s.env
|
table.Env = s.env
|
||||||
table.Encoder = encoder
|
table.Encoder = encoder
|
||||||
@ -206,6 +231,8 @@ func (s *StateStore) initialize() error {
|
|||||||
"NodeChecks": MDBTables{s.checkTable},
|
"NodeChecks": MDBTables{s.checkTable},
|
||||||
"ServiceChecks": MDBTables{s.checkTable},
|
"ServiceChecks": MDBTables{s.checkTable},
|
||||||
"CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
|
"CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
|
||||||
|
"KVSGet": MDBTables{s.kvsTable},
|
||||||
|
"KVSList": MDBTables{s.kvsTable},
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -686,6 +713,159 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e
|
|||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVSSet is used to create or update a KV entry
|
||||||
|
func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error {
|
||||||
|
// Start a new txn
|
||||||
|
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the existing node
|
||||||
|
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the create and modify times
|
||||||
|
if len(res) == 0 {
|
||||||
|
d.CreateIndex = index
|
||||||
|
} else {
|
||||||
|
d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex
|
||||||
|
}
|
||||||
|
d.ModifyIndex = index
|
||||||
|
|
||||||
|
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer s.watch[s.kvsTable].Notify()
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSRestore is used to restore a DirEntry. It should only be used when
|
||||||
|
// doing a restore, otherwise KVSSet should be used.
|
||||||
|
func (s *StateStore) KVSRestore(d *structs.DirEntry) error {
|
||||||
|
// Start a new txn
|
||||||
|
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSGet is used to get a KV entry
|
||||||
|
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
|
||||||
|
idx, res, err := s.kvsTable.Get("id", key)
|
||||||
|
var d *structs.DirEntry
|
||||||
|
if len(res) > 0 {
|
||||||
|
d = res[0].(*structs.DirEntry)
|
||||||
|
}
|
||||||
|
return idx, d, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSList is used to list all KV entries with a prefix
|
||||||
|
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
|
||||||
|
idx, res, err := s.kvsTable.Get("id_prefix", prefix)
|
||||||
|
ents := make(structs.DirEntries, len(res))
|
||||||
|
for idx, r := range res {
|
||||||
|
ents[idx] = r.(*structs.DirEntry)
|
||||||
|
}
|
||||||
|
return idx, ents, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSDelete is used to delete a KVS entry
|
||||||
|
func (s *StateStore) KVSDelete(index uint64, key string) error {
|
||||||
|
return s.kvsDeleteWithIndex(index, "id", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSDeleteTree is used to delete all keys with a given prefix
|
||||||
|
func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
|
||||||
|
if prefix == "" {
|
||||||
|
return s.kvsDeleteWithIndex(index, "id")
|
||||||
|
}
|
||||||
|
return s.kvsDeleteWithIndex(index, "id_prefix", prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
// kvsDeleteWithIndex does a delete with either the id or id_prefix
|
||||||
|
func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error {
|
||||||
|
// Start a new txn
|
||||||
|
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if num > 0 {
|
||||||
|
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer s.watch[s.kvsTable].Notify()
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// KVSCheckAndSet is used to perform an atomic check-and-set
|
||||||
|
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
|
||||||
|
// Start a new txn
|
||||||
|
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the existing node
|
||||||
|
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the existing node if any
|
||||||
|
var exist *structs.DirEntry
|
||||||
|
if len(res) > 0 {
|
||||||
|
exist = res[0].(*structs.DirEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the ModifyIndex as the constraint. A modify of time of 0
|
||||||
|
// means we are doing a set-if-not-exists, while any other value
|
||||||
|
// means we expect that modify time.
|
||||||
|
if d.ModifyIndex == 0 && exist != nil {
|
||||||
|
return false, nil
|
||||||
|
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the create and modify times
|
||||||
|
if exist == nil {
|
||||||
|
d.CreateIndex = index
|
||||||
|
} else {
|
||||||
|
d.CreateIndex = exist.CreateIndex
|
||||||
|
}
|
||||||
|
d.ModifyIndex = index
|
||||||
|
|
||||||
|
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer s.watch[s.kvsTable].Notify()
|
||||||
|
return true, tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
// Snapshot is used to create a point in time snapshot
|
// Snapshot is used to create a point in time snapshot
|
||||||
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
||||||
// Begin a new txn on all tables
|
// Begin a new txn on all tables
|
||||||
@ -742,3 +922,10 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks {
|
|||||||
_, checks := s.store.parseHealthChecks(s.lastIndex, res, err)
|
_, checks := s.store.parseHealthChecks(s.lastIndex, res, err)
|
||||||
return checks
|
return checks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVSDump is used to list all KV entries. It takes a channel and streams
|
||||||
|
// back *struct.DirEntry objects. This will block and should be invoked
|
||||||
|
// in a goroutine.
|
||||||
|
func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
|
||||||
|
return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
|
||||||
|
}
|
||||||
|
@ -550,6 +550,16 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add some KVS entries
|
||||||
|
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(14, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(15, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Take a snapshot
|
// Take a snapshot
|
||||||
snap, err := store.Snapshot()
|
snap, err := store.Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -558,7 +568,7 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
defer snap.Close()
|
defer snap.Close()
|
||||||
|
|
||||||
// Check the last nodes
|
// Check the last nodes
|
||||||
if idx := snap.LastIndex(); idx != 13 {
|
if idx := snap.LastIndex(); idx != 15 {
|
||||||
t.Fatalf("bad: %v", idx)
|
t.Fatalf("bad: %v", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -591,6 +601,28 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
t.Fatalf("bad: %v", checks[0])
|
t.Fatalf("bad: %v", checks[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check we have the entries
|
||||||
|
streamCh := make(chan interface{}, 64)
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
var ents []*structs.DirEntry
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
obj := <-streamCh
|
||||||
|
if obj == nil {
|
||||||
|
close(doneCh)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ents = append(ents, obj.(*structs.DirEntry))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if err := snap.KVSDump(streamCh); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
<-doneCh
|
||||||
|
if len(ents) != 2 {
|
||||||
|
t.Fatalf("missing KVS entries!")
|
||||||
|
}
|
||||||
|
|
||||||
// Make some changes!
|
// Make some changes!
|
||||||
if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
|
if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
@ -612,6 +644,10 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := store.KVSDelete(18, "/web/a"); err != nil {
|
||||||
|
t.Fatalf("err: %v")
|
||||||
|
}
|
||||||
|
|
||||||
// Check snapshot has old values
|
// Check snapshot has old values
|
||||||
nodes = snap.Nodes()
|
nodes = snap.Nodes()
|
||||||
if len(nodes) != 2 {
|
if len(nodes) != 2 {
|
||||||
@ -639,6 +675,28 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(checks[0], check) {
|
if !reflect.DeepEqual(checks[0], check) {
|
||||||
t.Fatalf("bad: %v", checks[0])
|
t.Fatalf("bad: %v", checks[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check we have the entries
|
||||||
|
streamCh = make(chan interface{}, 64)
|
||||||
|
doneCh = make(chan struct{})
|
||||||
|
ents = nil
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
obj := <-streamCh
|
||||||
|
if obj == nil {
|
||||||
|
close(doneCh)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ents = append(ents, obj.(*structs.DirEntry))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if err := snap.KVSDump(streamCh); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
<-doneCh
|
||||||
|
if len(ents) != 2 {
|
||||||
|
t.Fatalf("missing KVS entries!")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEnsureCheck(t *testing.T) {
|
func TestEnsureCheck(t *testing.T) {
|
||||||
@ -933,3 +991,279 @@ func TestSS_Register_Deregister_Query(t *testing.T) {
|
|||||||
t.Fatalf("Bad: %v", nodes)
|
t.Fatalf("Bad: %v", nodes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestKVSSet_Get(t *testing.T) {
|
||||||
|
store, err := testStateStore()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close()
|
||||||
|
|
||||||
|
// Should not exist
|
||||||
|
idx, d, err := store.KVSGet("/foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 0 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if d != nil {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the entry
|
||||||
|
d = &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1000, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should exist exist
|
||||||
|
idx, d, err = store.KVSGet("/foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 1000 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if d.CreateIndex != 1000 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.ModifyIndex != 1000 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.Key != "/foo" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.Flags != 42 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if string(d.Value) != "test" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the entry
|
||||||
|
d = &structs.DirEntry{Key: "/foo", Flags: 43, Value: []byte("zip")}
|
||||||
|
if err := store.KVSSet(1010, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should update
|
||||||
|
idx, d, err = store.KVSGet("/foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 1010 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if d.CreateIndex != 1000 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.ModifyIndex != 1010 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.Key != "/foo" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if d.Flags != 43 {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
if string(d.Value) != "zip" {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVSDelete(t *testing.T) {
|
||||||
|
store, err := testStateStore()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close()
|
||||||
|
|
||||||
|
// Create the entry
|
||||||
|
d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1000, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the entry
|
||||||
|
if err := store.KVSDelete(1020, "/foo"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should not exist
|
||||||
|
idx, d, err := store.KVSGet("/foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 1020 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if d != nil {
|
||||||
|
t.Fatalf("bad: %v", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVSCheckAndSet(t *testing.T) {
|
||||||
|
store, err := testStateStore()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close()
|
||||||
|
|
||||||
|
// CAS should fail, no entry
|
||||||
|
d := &structs.DirEntry{
|
||||||
|
ModifyIndex: 100,
|
||||||
|
Key: "/foo",
|
||||||
|
Flags: 42,
|
||||||
|
Value: []byte("test"),
|
||||||
|
}
|
||||||
|
ok, err := store.KVSCheckAndSet(1000, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("unexpected commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Constrain on not-exist, should work
|
||||||
|
d.ModifyIndex = 0
|
||||||
|
ok, err = store.KVSCheckAndSet(1001, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Constrain on not-exist, should fail
|
||||||
|
d.ModifyIndex = 0
|
||||||
|
ok, err = store.KVSCheckAndSet(1002, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("unexpected commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Constrain on a wrong modify time
|
||||||
|
d.ModifyIndex = 1000
|
||||||
|
ok, err = store.KVSCheckAndSet(1003, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
t.Fatalf("unexpected commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Constrain on a correct modify time
|
||||||
|
d.ModifyIndex = 1001
|
||||||
|
ok, err = store.KVSCheckAndSet(1004, d)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected commit")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVS_List(t *testing.T) {
|
||||||
|
store, err := testStateStore()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close()
|
||||||
|
|
||||||
|
// Should not exist
|
||||||
|
idx, ents, err := store.KVSList("/web")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 0 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if len(ents) != 0 {
|
||||||
|
t.Fatalf("bad: %v", ents)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the entries
|
||||||
|
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1000, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1001, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1002, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should list
|
||||||
|
idx, ents, err = store.KVSList("/web")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 1002 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if len(ents) != 3 {
|
||||||
|
t.Fatalf("bad: %v", ents)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ents[0].Key != "/web/a" {
|
||||||
|
t.Fatalf("bad: %v", ents[0])
|
||||||
|
}
|
||||||
|
if ents[1].Key != "/web/b" {
|
||||||
|
t.Fatalf("bad: %v", ents[1])
|
||||||
|
}
|
||||||
|
if ents[2].Key != "/web/sub/c" {
|
||||||
|
t.Fatalf("bad: %v", ents[2])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVSDeleteTree(t *testing.T) {
|
||||||
|
store, err := testStateStore()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer store.Close()
|
||||||
|
|
||||||
|
// Should not exist
|
||||||
|
err = store.KVSDeleteTree(1000, "/web")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the entries
|
||||||
|
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1000, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1001, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")}
|
||||||
|
if err := store.KVSSet(1002, d); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nuke the web tree
|
||||||
|
err = store.KVSDeleteTree(1010, "/web")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nothing should list
|
||||||
|
idx, ents, err := store.KVSList("/web")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if idx != 1010 {
|
||||||
|
t.Fatalf("bad: %v", idx)
|
||||||
|
}
|
||||||
|
if len(ents) != 0 {
|
||||||
|
t.Fatalf("bad: %v", ents)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ type MessageType uint8
|
|||||||
const (
|
const (
|
||||||
RegisterRequestType MessageType = iota
|
RegisterRequestType MessageType = iota
|
||||||
DeregisterRequestType
|
DeregisterRequestType
|
||||||
|
KVSRequestType
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -172,6 +173,45 @@ type IndexedCheckServiceNodes struct {
|
|||||||
Nodes CheckServiceNodes
|
Nodes CheckServiceNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DirEntry is used to represent a directory entry. This is
|
||||||
|
// used for values in our Key-Value store.
|
||||||
|
type DirEntry struct {
|
||||||
|
CreateIndex uint64
|
||||||
|
ModifyIndex uint64
|
||||||
|
Key string
|
||||||
|
Flags uint64
|
||||||
|
Value []byte
|
||||||
|
}
|
||||||
|
type DirEntries []*DirEntry
|
||||||
|
|
||||||
|
type KVSOp string
|
||||||
|
|
||||||
|
const (
|
||||||
|
KVSSet KVSOp = "set"
|
||||||
|
KVSDelete = "delete"
|
||||||
|
KVSDeleteTree = "delete-tree"
|
||||||
|
KVSCAS = "cas" // Check-and-set
|
||||||
|
)
|
||||||
|
|
||||||
|
// KVSRequest is used to operate on the Key-Value store
|
||||||
|
type KVSRequest struct {
|
||||||
|
Datacenter string
|
||||||
|
Op KVSOp // Which operation are we performing
|
||||||
|
DirEnt DirEntry // Which directory entry
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyRequest is used to request a key, or key prefix
|
||||||
|
type KeyRequest struct {
|
||||||
|
Datacenter string
|
||||||
|
Key string
|
||||||
|
BlockingQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
type IndexedDirEntries struct {
|
||||||
|
Index uint64
|
||||||
|
Entries DirEntries
|
||||||
|
}
|
||||||
|
|
||||||
// Decode is used to decode a MsgPack encoded object
|
// Decode is used to decode a MsgPack encoded object
|
||||||
func Decode(buf []byte, out interface{}) error {
|
func Decode(buf []byte, out interface{}) error {
|
||||||
var handle codec.MsgpackHandle
|
var handle codec.MsgpackHandle
|
||||||
|
@ -7,11 +7,12 @@ sidebar_current: "docs-agent-http"
|
|||||||
# HTTP API
|
# HTTP API
|
||||||
|
|
||||||
The main interface to Consul is a RESTful HTTP API. The API can be
|
The main interface to Consul is a RESTful HTTP API. The API can be
|
||||||
used for CRUD for nodes, services, and checks. The endpoints are
|
used for CRUD for nodes, services, checks, and configuration. The endpoints are
|
||||||
versioned to enable changes without breaking backwards compatibility.
|
versioned to enable changes without breaking backwards compatibility.
|
||||||
|
|
||||||
All endpoints fall into one of 4 categories:
|
All endpoints fall into one of 5 categories:
|
||||||
|
|
||||||
|
* kv - Key/Value store
|
||||||
* agent - Agent control
|
* agent - Agent control
|
||||||
* catalog - Manages nodes and services
|
* catalog - Manages nodes and services
|
||||||
* health - Manages health checks
|
* health - Manages health checks
|
||||||
@ -28,7 +29,7 @@ Queries that support this will mention it specifically, however the use of this
|
|||||||
feature is the same for all. If supported, the query will set an HTTP header
|
feature is the same for all. If supported, the query will set an HTTP header
|
||||||
"X-Consul-Index". This is an opaque handle that the client will use.
|
"X-Consul-Index". This is an opaque handle that the client will use.
|
||||||
|
|
||||||
To cause a query to block, the query parameters "?wait=<interval>&index=<idx>" are added
|
To cause a query to block, the query parameters "?wait=\<interval\>&index=\<idx\>" are added
|
||||||
to a request. The "?wait=" query parameter limits how long the query will potentially
|
to a request. The "?wait=" query parameter limits how long the query will potentially
|
||||||
block for. It not set, it will default to 10 minutes. It can be specified in the form of
|
block for. It not set, it will default to 10 minutes. It can be specified in the form of
|
||||||
"10s" or "5m", which is 10 seconds or 5 minutes respectively. The "?index=" parameter is an
|
"10s" or "5m", which is 10 seconds or 5 minutes respectively. The "?index=" parameter is an
|
||||||
@ -41,6 +42,72 @@ note is that when the query returns there is **no guarantee** of a change. It is
|
|||||||
possible that the timeout was reached, or that there was an idempotent write that
|
possible that the timeout was reached, or that there was an idempotent write that
|
||||||
does not affect the result.
|
does not affect the result.
|
||||||
|
|
||||||
|
|
||||||
|
## KV
|
||||||
|
|
||||||
|
The KV endpoint is used to expose a simple key/value store. This can be used
|
||||||
|
to store service configurations or other meta data in a simple way. It has only
|
||||||
|
a single endpoint:
|
||||||
|
|
||||||
|
/v1/kv/<key>
|
||||||
|
|
||||||
|
This is the only endpoint that is used with the Key/Value store.
|
||||||
|
It's use depends on the HTTP method. The `GET`, `PUT` and `DELETE` methods
|
||||||
|
are all supported.
|
||||||
|
|
||||||
|
When using the `GET` method, Consul will return the specified key,
|
||||||
|
or if the "?recurse" query parameter is provided, it will return
|
||||||
|
all keys with the given prefix.
|
||||||
|
|
||||||
|
Each object will look like:
|
||||||
|
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"CreateIndex":100,
|
||||||
|
"ModifyIndex":200,
|
||||||
|
"Key":"zip",
|
||||||
|
"Flags":0,
|
||||||
|
"Value":"dGVzdA=="
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
The `CreateIndex` is the internal index value that represents
|
||||||
|
when the entry was created. The `ModifyIndex` is the last index
|
||||||
|
that modified this key. This index corresponds to the `X-Consul-Index`
|
||||||
|
header value that is returned. A blocking query can be used to wait for
|
||||||
|
a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds
|
||||||
|
to the latest `ModifyIndex` and so a blocking query waits until any of the
|
||||||
|
listed keys are updated.
|
||||||
|
|
||||||
|
The `Key` is simply the full path of the entry. `Flags` are an opaque
|
||||||
|
unsigned integer that can be attached to each entry. The use of this is
|
||||||
|
left totally to the user. Lastly, the `Value` is a base64 key value.
|
||||||
|
|
||||||
|
If no entries are found, a 404 code is returned.
|
||||||
|
|
||||||
|
When using the `PUT` method, Consul expects the request body to be the
|
||||||
|
value corresponding to the key. There are a number of parameters that can
|
||||||
|
be used with a PUT request:
|
||||||
|
|
||||||
|
* ?flags=\<num\> : This can be used to specify an unsigned value between
|
||||||
|
0 and 2^64-1. It is opaque to the user, but a client application may
|
||||||
|
use it.
|
||||||
|
|
||||||
|
* ?cas=\<index\> : This flag is used to turn the `PUT` into a **Check-And-Set**
|
||||||
|
operation. This is very useful as it allows clients to build more complex
|
||||||
|
syncronization primitives on top. If the index is 0, then Consul will only
|
||||||
|
put the key if it does not already exist. If the index is non-zero, then
|
||||||
|
the key is only set if the index matches the `ModifyIndex` of that key.
|
||||||
|
|
||||||
|
The return value is simply either `true` or `false`. If the CAS check fails,
|
||||||
|
then `false` will be returned.
|
||||||
|
|
||||||
|
Lastly, the `DELETE` method can be used to delete a single key or all
|
||||||
|
keys sharing a prefix. If the "?recurse" query parameter is provided,
|
||||||
|
then all keys with the prefix are deleted, otherwise only the specified
|
||||||
|
key.
|
||||||
|
|
||||||
|
|
||||||
## Agent
|
## Agent
|
||||||
|
|
||||||
The Agent endpoints are used to interact with a local Consul agent. Usually,
|
The Agent endpoints are used to interact with a local Consul agent. Usually,
|
||||||
|
@ -35,30 +35,34 @@ Here is an example output:
|
|||||||
num_peers = 2
|
num_peers = 2
|
||||||
state = Leader
|
state = Leader
|
||||||
term = 4
|
term = 4
|
||||||
serf-lan:
|
serf_lan:
|
||||||
event-queue = 0
|
event_queue = 0
|
||||||
event-time = 2
|
event_time = 2
|
||||||
failed = 0
|
failed = 0
|
||||||
intent-queue = 0
|
intent_queue = 0
|
||||||
left = 0
|
left = 0
|
||||||
member-time = 7
|
member_time = 7
|
||||||
members = 3
|
members = 3
|
||||||
serf-wan:
|
query_queue = 0
|
||||||
event-queue = 0
|
query_time = 1
|
||||||
event-time = 1
|
serf_wan:
|
||||||
|
event_queue = 0
|
||||||
|
event_time = 1
|
||||||
failed = 0
|
failed = 0
|
||||||
intent-queue = 0
|
intent_queue = 0
|
||||||
left = 0
|
left = 0
|
||||||
member-time = 1
|
member_time = 1
|
||||||
members = 1
|
members = 1
|
||||||
|
query_queue = 0
|
||||||
|
query_time = 1
|
||||||
|
|
||||||
There are currently the top-level keys for:
|
There are currently the top-level keys for:
|
||||||
|
|
||||||
* agent: Provides information about the agent
|
* agent: Provides information about the agent
|
||||||
* consul: Information about the consul library (client or server)
|
* consul: Information about the consul library (client or server)
|
||||||
* raft: Provides info about the Raft [consensus library](/docs/internals/consensus.html)
|
* raft: Provides info about the Raft [consensus library](/docs/internals/consensus.html)
|
||||||
* serf-lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html)
|
* serf_lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html)
|
||||||
* serf-wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html)
|
* serf_wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html)
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user