OSS KV Modifications to Support Namespaces

This commit is contained in:
Matt Keeler 2019-11-25 12:57:35 -05:00
parent 7b471f6bf8
commit b069d6777b
No known key found for this signature in database
GPG Key ID: 04DBAE1857E0081B
41 changed files with 961 additions and 787 deletions

View File

@ -14,7 +14,10 @@ func (d *dirEntFilter) Len() int {
return len(d.ent)
}
func (d *dirEntFilter) Filter(i int) bool {
return d.authorizer.KeyRead(d.ent[i].Key, nil) != acl.Allow
var entCtx acl.EnterpriseAuthorizerContext
d.ent[i].FillAuthzContext(&entCtx)
return d.authorizer.KeyRead(d.ent[i].Key, &entCtx) != acl.Allow
}
func (d *dirEntFilter) Move(dst, src, span int) {
copy(d.ent[dst:dst+span], d.ent[src:src+span])
@ -27,30 +30,6 @@ func FilterDirEnt(authorizer acl.Authorizer, ent structs.DirEntries) structs.Dir
return ent[:FilterEntries(&df)]
}
type keyFilter struct {
authorizer acl.Authorizer
keys []string
}
func (k *keyFilter) Len() int {
return len(k.keys)
}
func (k *keyFilter) Filter(i int) bool {
// TODO (namespaces) use a real ent authz context here
return k.authorizer.KeyRead(k.keys[i], nil) != acl.Allow
}
func (k *keyFilter) Move(dst, src, span int) {
copy(k.keys[dst:dst+span], k.keys[src:src+span])
}
// FilterKeys is used to filter a list of keys by
// applying an ACL policy
func FilterKeys(authorizer acl.Authorizer, keys []string) []string {
kf := keyFilter{authorizer: authorizer, keys: keys}
return keys[:FilterEntries(&kf)]
}
type txnResultsFilter struct {
authorizer acl.Authorizer
results structs.TxnResults

View File

@ -50,38 +50,6 @@ func TestFilter_DirEnt(t *testing.T) {
}
}
func TestFilter_Keys(t *testing.T) {
t.Parallel()
policy, _ := acl.NewPolicyFromSource("", 0, testFilterRules, acl.SyntaxLegacy, nil, nil)
aclR, _ := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
type tcase struct {
in []string
out []string
}
cases := []tcase{
tcase{
in: []string{"foo/test", "foo/priv/nope", "foo/other", "zoo"},
out: []string{"foo/test", "foo/other"},
},
tcase{
in: []string{"abe", "lincoln"},
out: []string{},
},
tcase{
in: []string{"abe", "foo/1", "foo/2", "foo/3", "nope"},
out: []string{"foo/1", "foo/2", "foo/3"},
},
}
for _, tc := range cases {
out := FilterKeys(aclR, tc.in)
if !reflect.DeepEqual(out, tc.out) {
t.Fatalf("bad: %#v %#v", out, tc.out)
}
}
}
func TestFilter_TxnResults(t *testing.T) {
t.Parallel()
policy, _ := acl.NewPolicyFromSource("", 0, testFilterRules, acl.SyntaxLegacy, nil, nil)

View File

@ -93,15 +93,15 @@ func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
case api.KVSet:
return c.state.KVSSet(index, &req.DirEnt)
case api.KVDelete:
return c.state.KVSDelete(index, req.DirEnt.Key)
return c.state.KVSDelete(index, req.DirEnt.Key, &req.DirEnt.EnterpriseMeta)
case api.KVDeleteCAS:
act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key)
act, err := c.state.KVSDeleteCAS(index, req.DirEnt.ModifyIndex, req.DirEnt.Key, &req.DirEnt.EnterpriseMeta)
if err != nil {
return err
}
return act
case api.KVDeleteTree:
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
return c.state.KVSDeleteTree(index, req.DirEnt.Key, &req.DirEnt.EnterpriseMeta)
case api.KVCAS:
act, err := c.state.KVSSetCAS(index, &req.DirEnt)
if err != nil {

View File

@ -390,7 +390,7 @@ func TestFSM_KVSDelete(t *testing.T) {
}
// Verify key is not set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -437,7 +437,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
}
// Verify key is not set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -472,7 +472,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
}
// Verify key is set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -493,7 +493,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
}
// Verify key is gone
_, d, err = fsm.state.KVSGet(nil, "/test/path")
_, d, err = fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -528,7 +528,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
}
// Verify key is set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -550,7 +550,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
}
// Verify key is updated
_, d, err = fsm.state.KVSGet(nil, "/test/path")
_, d, err = fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -566,9 +566,16 @@ func TestFSM_KVSLock(t *testing.T) {
t.Fatalf("err: %v", err)
}
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
err = fsm.state.SessionCreate(2, session)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
@ -589,7 +596,7 @@ func TestFSM_KVSLock(t *testing.T) {
}
// Verify key is locked
_, d, err := fsm.state.KVSGet(nil, "/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -611,9 +618,16 @@ func TestFSM_KVSUnlock(t *testing.T) {
t.Fatalf("err: %v", err)
}
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
err = fsm.state.SessionCreate(2, session)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
@ -652,7 +666,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
}
// Verify key is unlocked
_, d, err := fsm.state.KVSGet(nil, "/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -675,8 +689,14 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
}
// Register some nodes.
fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"})
fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"})
err = fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
// Write a batch of two coordinates.
updates := structs.Coordinates{
@ -715,12 +735,19 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
t.Fatalf("err: %v", err)
}
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureCheck(2, &structs.HealthCheck{
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureCheck(2, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Status: api.HealthPassing,
})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new session
req := structs.SessionRequest{
@ -914,8 +941,15 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) {
}
// Register a service to query on.
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new query.
query := structs.PreparedQueryRequest{
@ -1012,12 +1046,20 @@ func TestFSM_TombstoneReap(t *testing.T) {
}
// Create some tombstones
fsm.state.KVSSet(11, &structs.DirEntry{
err = fsm.state.KVSSet(11, &structs.DirEntry{
Key: "/remove",
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList(nil, "/remove")
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.KVSDelete(12, "/remove", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
idx, _, err := fsm.state.KVSList(nil, "/remove", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1085,7 +1127,7 @@ func TestFSM_Txn(t *testing.T) {
}
// Verify key is set directly in the state store.
_, d, err := fsm.state.KVSGet(nil, "/test/path")
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1599,7 +1641,11 @@ func TestFSM_Chunking_TermChange(t *testing.T) {
// Now verify the other baseline, that when the term doesn't change we see
// non-nil. First make the chunker have a clean state, then set the terms
// to be the same.
fsm.chunker.RestoreState(nil)
err = fsm.chunker.RestoreState(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
logs[1].Term = uint64(0)
// We should see nil only for the first one

View File

@ -143,8 +143,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
Key: "/remove",
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList(nil, "/remove")
fsm.state.KVSDelete(12, "/remove", nil)
idx, _, err := fsm.state.KVSList(nil, "/remove", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -351,7 +351,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
}
// Verify key is set
_, d, err := fsm2.state.KVSGet(nil, "/test")
_, d, err := fsm2.state.KVSGet(nil, "/test", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -2,6 +2,7 @@ package consul
import (
"fmt"
"strings"
"time"
"github.com/armon/go-metrics"
@ -22,7 +23,6 @@ type KVS struct {
// must only be done on the leader.
func kvsPreApply(srv *Server, rule acl.Authorizer, op api.KVOp, dirEnt *structs.DirEntry) (bool, error) {
// Verify the entry.
if dirEnt.Key == "" && op != api.KVDeleteTree {
return false, fmt.Errorf("Must provide key")
}
@ -31,8 +31,10 @@ func kvsPreApply(srv *Server, rule acl.Authorizer, op api.KVOp, dirEnt *structs.
if rule != nil {
switch op {
case api.KVDeleteTree:
// TODO (namespaces) use actual ent authz context - ensure we set the Sentinel Scope
if rule.KeyWritePrefix(dirEnt.Key, nil) != acl.Allow {
var authzContext acl.EnterpriseAuthorizerContext
dirEnt.FillAuthzContext(&authzContext)
if rule.KeyWritePrefix(dirEnt.Key, &authzContext) != acl.Allow {
return false, acl.ErrPermissionDenied
}
@ -43,13 +45,17 @@ func kvsPreApply(srv *Server, rule acl.Authorizer, op api.KVOp, dirEnt *structs.
// These could reveal information based on the outcome
// of the transaction, and they operate on individual
// keys so we check them here.
if rule.KeyRead(dirEnt.Key, nil) != acl.Allow {
var authzContext acl.EnterpriseAuthorizerContext
dirEnt.FillAuthzContext(&authzContext)
if rule.KeyRead(dirEnt.Key, &authzContext) != acl.Allow {
return false, acl.ErrPermissionDenied
}
default:
var authzContext acl.EnterpriseAuthorizerContext
dirEnt.FillAuthzContext(&authzContext)
if rule.KeyWrite(dirEnt.Key, &authzContext) != acl.Allow {
return false, acl.ErrPermissionDenied
}
@ -64,7 +70,7 @@ func kvsPreApply(srv *Server, rule acl.Authorizer, op api.KVOp, dirEnt *structs.
// only the wall-time of the leader node is used, preventing any inconsistencies.
if op == api.KVLock {
state := srv.fsm.State()
expires := state.KVSLockDelay(dirEnt.Key)
expires := state.KVSLockDelay(dirEnt.Key, &dirEnt.EnterpriseMeta)
if expires.After(time.Now()) {
srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
dirEnt.Key, expires)
@ -82,12 +88,16 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
}
defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now())
if err := k.srv.validateEnterpriseRequest(&args.DirEnt.EnterpriseMeta, true); err != nil {
return err
}
// Perform the pre-apply checks.
acl, err := k.srv.ResolveToken(args.Token)
rule, err := k.srv.ResolveToken(args.Token)
if err != nil {
return err
}
ok, err := kvsPreApply(k.srv, acl, args.Op, &args.DirEnt)
ok, err := kvsPreApply(k.srv, rule, args.Op, &args.DirEnt)
if err != nil {
return err
}
@ -118,20 +128,27 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
if done, err := k.srv.forward("KVS.Get", args, args, reply); done {
return err
}
if err := k.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
aclRule, err := k.srv.ResolveToken(args.Token)
var entCtx acl.EnterpriseAuthorizerContext
args.FillAuthzContext(&entCtx)
rule, err := k.srv.ResolveToken(args.Token)
if err != nil {
return err
}
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, ent, err := state.KVSGet(ws, args.Key)
index, ent, err := state.KVSGet(ws, args.Key, &args.EnterpriseMeta)
if err != nil {
return err
}
if aclRule != nil && aclRule.KeyRead(args.Key, nil) != acl.Allow {
if rule != nil && rule.KeyRead(args.Key, &entCtx) != acl.Allow {
return acl.ErrPermissionDenied
}
@ -157,13 +174,18 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
if done, err := k.srv.forward("KVS.List", args, args, reply); done {
return err
}
aclToken, err := k.srv.ResolveToken(args.Token)
if err != nil {
if err := k.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if aclToken != nil && k.srv.config.ACLEnableKeyListPolicy && aclToken.KeyList(args.Key, nil) != acl.Allow {
var entCtx acl.EnterpriseAuthorizerContext
args.FillAuthzContext(&entCtx)
rule, err := k.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && k.srv.config.ACLEnableKeyListPolicy && rule.KeyList(args.Key, &entCtx) != acl.Allow {
return acl.ErrPermissionDenied
}
@ -171,12 +193,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, ent, err := state.KVSList(ws, args.Key)
index, ent, err := state.KVSList(ws, args.Key, &args.EnterpriseMeta)
if err != nil {
return err
}
if aclToken != nil {
ent = FilterDirEnt(aclToken, ent)
if rule != nil {
ent = FilterDirEnt(rule, ent)
}
if len(ent) == 0 {
@ -197,17 +219,25 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
}
// ListKeys is used to list all keys with a given prefix to a separator.
// An optional separator may be specified, which can be used to slice off a part
// of the response so that only a subset of the prefix is returned. In this
// mode, the keys which are omitted are still counted in the returned index.
func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error {
if done, err := k.srv.forward("KVS.ListKeys", args, args, reply); done {
return err
}
aclToken, err := k.srv.ResolveToken(args.Token)
if err != nil {
if err := k.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if aclToken != nil && k.srv.config.ACLEnableKeyListPolicy && aclToken.KeyList(args.Prefix, nil) != acl.Allow {
var entCtx acl.EnterpriseAuthorizerContext
args.FillAuthzContext(&entCtx)
rule, err := k.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && k.srv.config.ACLEnableKeyListPolicy && rule.KeyList(args.Prefix, &entCtx) != acl.Allow {
return acl.ErrPermissionDenied
}
@ -215,7 +245,7 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator)
index, entries, err := state.KVSList(ws, args.Prefix, &args.EnterpriseMeta)
if err != nil {
return err
}
@ -228,8 +258,37 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
reply.Index = index
}
if aclToken != nil {
keys = FilterKeys(aclToken, keys)
if rule != nil {
entries = FilterDirEnt(rule, entries)
}
// Collect the keys from the filtered entries
prefixLen := len(args.Prefix)
sepLen := len(args.Seperator)
var keys []string
seen := make(map[string]bool)
for _, e := range entries {
// Always accumulate if no separator provided
if sepLen == 0 {
keys = append(keys, e.Key)
continue
}
// Parse and de-duplicate the returned keys based on the
// key separator, if provided.
after := e.Key[prefixLen:]
sepIdx := strings.Index(after, args.Seperator)
if sepIdx > -1 {
key := e.Key[:prefixLen+sepIdx+sepLen]
if ok := seen[key]; !ok {
keys = append(keys, key)
seen[key] = true
}
} else {
keys = append(keys, e.Key)
}
}
reply.Keys = keys
return nil

View File

@ -21,7 +21,7 @@ func TestKVS_Apply(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
arg := structs.KVSRequest{
Datacenter: "dc1",
@ -39,7 +39,7 @@ func TestKVS_Apply(t *testing.T) {
// Verify
state := s1.fsm.State()
_, d, err := state.KVSGet(nil, "test")
_, d, err := state.KVSGet(nil, "test", &arg.DirEnt.EnterpriseMeta)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -61,7 +61,7 @@ func TestKVS_Apply(t *testing.T) {
}
// Verify
_, d, err = state.KVSGet(nil, "test")
_, d, err = state.KVSGet(nil, "test", &arg.DirEnt.EnterpriseMeta)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -83,7 +83,7 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// Create the ACL
arg := structs.ACLRequest{
@ -142,7 +142,7 @@ func TestKVS_Get(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
arg := structs.KVSRequest{
Datacenter: "dc1",
@ -195,7 +195,7 @@ func TestKVS_Get_ACLDeny(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
arg := structs.KVSRequest{
Datacenter: "dc1",
@ -231,7 +231,7 @@ func TestKVSEndpoint_List(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
keys := []string{
"/test/key1",
@ -303,7 +303,7 @@ func TestKVSEndpoint_List_Blocking(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
keys := []string{
"/test/key1",
@ -404,7 +404,7 @@ func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
keys := []string{
"abe",
@ -491,7 +491,7 @@ func TestKVSEndpoint_List_ACLEnableKeyListPolicy(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
keys := []string{
"abe",
@ -610,7 +610,7 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
keys := []string{
"/test/key1",
@ -685,7 +685,7 @@ func TestKVSEndpoint_ListKeys_ACLDeny(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
keys := []string{
"abe",
@ -760,7 +760,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock.
state := s1.fsm.State()
@ -832,7 +832,7 @@ func TestKVS_Issue_1626(t *testing.T) {
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// Set up the first key.
{

View File

@ -25,6 +25,10 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
}
defer metrics.MeasureSince([]string{"session", "apply"}, time.Now())
if err := s.srv.validateEnterpriseRequest(&args.Session.EnterpriseMeta, true); err != nil {
return err
}
// Verify the args
if args.Session.ID == "" && args.Op == structs.SessionDestroy {
return fmt.Errorf("Must provide ID")
@ -33,14 +37,17 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
return fmt.Errorf("Must provide Node")
}
// TODO (namespaces) (acls) infer entmeta if not provided.
// The entMeta to populate will be the one in the Session struct, not SessionRequest
// This is because the Session is what is passed to downstream functions like raftApply
var entCtx acl.EnterpriseAuthorizerContext
args.Session.EnterpriseMeta.FillAuthzContext(&entCtx)
// Fetch the ACL token, if any, and apply the policy.
rule, err := s.srv.ResolveToken(args.Token)
if err != nil {
return err
}
// TODO (namespaces) (acls) infer entmeta if not provided.
// The entMeta to populate will be the one in the Session struct, not SessionRequest
// This is because the Session is what is passed to downstream functions like raftApply
if rule != nil && s.srv.config.ACLEnforceVersion8 {
switch args.Op {
@ -53,14 +60,12 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
if existing == nil {
return fmt.Errorf("Unknown session %q", args.Session.ID)
}
// TODO (namespaces) - pass through a real ent authz ctx
if rule.SessionWrite(existing.Node, nil) != acl.Allow {
if rule.SessionWrite(existing.Node, &entCtx) != acl.Allow {
return acl.ErrPermissionDenied
}
case structs.SessionCreate:
// TODO (namespaces) - pass through a real ent authz ctx
if rule.SessionWrite(args.Session.Node, nil) != acl.Allow {
if rule.SessionWrite(args.Session.Node, &entCtx) != acl.Allow {
return acl.ErrPermissionDenied
}
@ -151,7 +156,18 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
return err
}
if err := s.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
// TODO (namespaces) TODO (acls) infer args.entmeta if not provided
var entCtx acl.EnterpriseAuthorizerContext
args.FillAuthzContext(&entCtx)
rule, err := s.srv.ResolveToken(args.Token)
if err != nil {
return err
}
return s.srv.blockingQuery(
&args.QueryOptions,
@ -168,7 +184,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
} else {
reply.Sessions = nil
}
if err := s.srv.filterACL(args.Token, reply); err != nil {
if err := s.srv.filterACLWithAuthorizer(rule, reply); err != nil {
return err
}
return nil
@ -182,7 +198,18 @@ func (s *Session) List(args *structs.SessionSpecificRequest,
return err
}
if err := s.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
// TODO (namespaces) TODO (acls) infer args.entmeta if not provided
var entCtx acl.EnterpriseAuthorizerContext
args.FillAuthzContext(&entCtx)
rule, err := s.srv.ResolveToken(args.Token)
if err != nil {
return err
}
return s.srv.blockingQuery(
&args.QueryOptions,
@ -194,7 +221,7 @@ func (s *Session) List(args *structs.SessionSpecificRequest,
}
reply.Index, reply.Sessions = index, sessions
if err := s.srv.filterACL(args.Token, reply); err != nil {
if err := s.srv.filterACLWithAuthorizer(rule, reply); err != nil {
return err
}
return nil
@ -208,7 +235,18 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
return err
}
if err := s.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
// TODO (namespaces) TODO (acls) infer args.entmeta if not provided
var entCtx acl.EnterpriseAuthorizerContext
args.FillAuthzContext(&entCtx)
rule, err := s.srv.ResolveToken(args.Token)
if err != nil {
return err
}
return s.srv.blockingQuery(
&args.QueryOptions,
@ -220,7 +258,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
}
reply.Index, reply.Sessions = index, sessions
if err := s.srv.filterACL(args.Token, reply); err != nil {
if err := s.srv.filterACLWithAuthorizer(rule, reply); err != nil {
return err
}
return nil
@ -235,7 +273,9 @@ func (s *Session) Renew(args *structs.SessionSpecificRequest,
}
defer metrics.MeasureSince([]string{"session", "renew"}, time.Now())
// TODO (namespaces) (freddy):infer args.entmeta if not provided
if err := s.srv.validateEnterpriseRequest(&args.EnterpriseMeta, true); err != nil {
return err
}
// Get the session, from local state.
state := s.srv.fsm.State()
@ -249,14 +289,17 @@ func (s *Session) Renew(args *structs.SessionSpecificRequest,
return nil
}
// TODO (namespaces) (freddy):infer args.entmeta if not provided
// Fetch the ACL token, if any, and apply the policy.
var entCtx acl.EnterpriseAuthorizerContext
args.FillAuthzContext(&entCtx)
rule, err := s.srv.ResolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && s.srv.config.ACLEnforceVersion8 {
// TODO (namespaces) - pass through a real ent authz ctx
if rule.SessionWrite(session.Node, nil) != acl.Allow {
if rule.SessionWrite(session.Node, &entCtx) != acl.Allow {
return acl.ErrPermissionDenied
}
}

View File

@ -8,7 +8,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/go-uuid"
)
const (
@ -745,15 +745,9 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
}
// Invalidate any sessions for this node.
sessions, err := tx.Get("sessions", "node", nodeName)
toDelete, err := s.allNodeSessionsTxn(tx, nodeName)
if err != nil {
return fmt.Errorf("failed session lookup: %s", err)
}
var toDelete []*structs.Session
for sess := sessions.Next(); sess != nil; sess = sessions.Next() {
session := sess.(*structs.Session)
toDelete = append(toDelete, session)
return err
}
for _, session := range toDelete {

View File

@ -1,6 +1,9 @@
// +build !consulent
package state
import (
"github.com/hashicorp/consul/agent/structs"
"sync"
"time"
)
@ -32,7 +35,7 @@ func NewDelay() *Delay {
// GetExpiration returns the expiration time of a key lock delay. This must be
// checked on the leader node, and not in KVSLock due to the variability of
// clocks.
func (d *Delay) GetExpiration(key string) time.Time {
func (d *Delay) GetExpiration(key string, entMeta *structs.EnterpriseMeta) time.Time {
d.lock.RLock()
expires := d.delay[key]
d.lock.RUnlock()
@ -41,7 +44,7 @@ func (d *Delay) GetExpiration(key string) time.Time {
// SetExpiration sets the expiration time for the lock delay to the given
// delay from the given now time.
func (d *Delay) SetExpiration(key string, now time.Time, delay time.Duration) {
func (d *Delay) SetExpiration(key string, now time.Time, delay time.Duration, entMeta *structs.EnterpriseMeta) {
d.lock.Lock()
defer d.lock.Unlock()

View File

@ -9,21 +9,21 @@ func TestDelay(t *testing.T) {
d := NewDelay()
// An unknown key should have a time in the past.
if exp := d.GetExpiration("nope"); !exp.Before(time.Now()) {
if exp := d.GetExpiration("nope", nil); !exp.Before(time.Now()) {
t.Fatalf("bad: %v", exp)
}
// Add a key and set a short expiration.
now := time.Now()
delay := 250 * time.Millisecond
d.SetExpiration("bye", now, delay)
if exp := d.GetExpiration("bye"); !exp.After(now) {
d.SetExpiration("bye", now, delay, nil)
if exp := d.GetExpiration("bye", nil); !exp.After(now) {
t.Fatalf("bad: %v", exp)
}
// Wait for the key to expire and check again.
time.Sleep(2 * delay)
if exp := d.GetExpiration("bye"); !exp.Before(now) {
if exp := d.GetExpiration("bye", nil); !exp.Before(now) {
t.Fatalf("bad: %v", exp)
}
}

View File

@ -2,7 +2,7 @@ package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)
@ -10,6 +10,8 @@ import (
type Tombstone struct {
Key string
Index uint64
structs.EnterpriseMeta
}
// Graveyard manages a set of tombstones.
@ -25,15 +27,18 @@ func NewGraveyard(gc *TombstoneGC) *Graveyard {
}
// InsertTxn adds a new tombstone.
func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error {
// Insert the tombstone.
stone := &Tombstone{Key: key, Index: idx}
if err := tx.Insert("tombstones", stone); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64, entMeta *structs.EnterpriseMeta) error {
stone := &Tombstone{
Key: key,
Index: idx,
}
if entMeta != nil {
stone.EnterpriseMeta = *entMeta
}
if err := tx.Insert("index", &IndexEntry{"tombstones", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
// Insert the tombstone.
if err := g.insertTombstoneWithTxn(tx, "tombstones", stone, false); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
}
// If GC is configured, then we hint that this index requires reaping.
@ -45,8 +50,8 @@ func (g *Graveyard) InsertTxn(tx *memdb.Txn, key string, idx uint64) error {
// GetMaxIndexTxn returns the highest index tombstone whose key matches the
// given context, using a prefix match.
func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, prefix string) (uint64, error) {
stones, err := tx.Get("tombstones", "id_prefix", prefix)
func (g *Graveyard) GetMaxIndexTxn(tx *memdb.Txn, prefix string, entMeta *structs.EnterpriseMeta) (uint64, error) {
stones, err := getWithTxn(tx, "tombstones", "id_prefix", prefix, entMeta)
if err != nil {
return 0, fmt.Errorf("failed querying tombstones: %s", err)
}
@ -74,13 +79,10 @@ func (g *Graveyard) DumpTxn(tx *memdb.Txn) (memdb.ResultIterator, error) {
// RestoreTxn is used when restoring from a snapshot. For general inserts, use
// InsertTxn.
func (g *Graveyard) RestoreTxn(tx *memdb.Txn, stone *Tombstone) error {
if err := tx.Insert("tombstones", stone); err != nil {
if err := g.insertTombstoneWithTxn(tx, "tombstones", stone, true); err != nil {
return fmt.Errorf("failed inserting tombstone: %s", err)
}
if err := indexUpdateMaxTxn(tx, stone.Index, "tombstones"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}

View File

@ -0,0 +1,28 @@
// +build !consulent
package state
import (
"fmt"
"github.com/hashicorp/go-memdb"
)
func (g *Graveyard) insertTombstoneWithTxn(tx *memdb.Txn,
table string, stone *Tombstone, updateMax bool) error {
if err := tx.Insert("tombstones", stone); err != nil {
return err
}
if updateMax {
if err := indexUpdateMaxTxn(tx, stone.Index, "tombstones"); err != nil {
return fmt.Errorf("failed updating tombstone index: %v", err)
}
} else {
if err := tx.Insert("index", &IndexEntry{"tombstones", stone.Index}); err != nil {
return fmt.Errorf("failed updating tombstone index: %s", err)
}
}
return nil
}

View File

@ -1,7 +1,6 @@
package state
import (
"reflect"
"testing"
"time"
)
@ -18,16 +17,16 @@ func TestGraveyard_Lifecycle(t *testing.T) {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
if err := g.InsertTxn(tx, "foo/in/the/house", 2, nil); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/baz", 5); err != nil {
if err := g.InsertTxn(tx, "foo/bar/baz", 5, nil); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/zoo", 8); err != nil {
if err := g.InsertTxn(tx, "foo/bar/zoo", 8, nil); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "some/other/path", 9); err != nil {
if err := g.InsertTxn(tx, "some/other/path", 9, nil); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
@ -38,25 +37,25 @@ func TestGraveyard_Lifecycle(t *testing.T) {
tx := s.db.Txn(false)
defer tx.Abort()
if idx, err := g.GetMaxIndexTxn(tx, "foo"); idx != 8 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo", nil); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/in/the/house"); idx != 2 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo/in/the/house", nil); idx != 2 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/baz"); idx != 5 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/baz", nil); idx != 5 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/zoo"); idx != 8 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/zoo", nil); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "some/other/path"); idx != 9 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "some/other/path", nil); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, ""); idx != 9 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "", nil); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "nope"); idx != 0 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "nope", nil); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
}()
@ -77,25 +76,25 @@ func TestGraveyard_Lifecycle(t *testing.T) {
tx := s.db.Txn(false)
defer tx.Abort()
if idx, err := g.GetMaxIndexTxn(tx, "foo"); idx != 8 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo", nil); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/in/the/house"); idx != 0 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo/in/the/house", nil); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/baz"); idx != 0 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/baz", nil); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/zoo"); idx != 8 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "foo/bar/zoo", nil); idx != 8 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "some/other/path"); idx != 9 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "some/other/path", nil); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, ""); idx != 9 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "", nil); idx != 9 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
if idx, err := g.GetMaxIndexTxn(tx, "nope"); idx != 0 || err != nil {
if idx, err := g.GetMaxIndexTxn(tx, "nope", nil); idx != 0 || err != nil {
t.Fatalf("bad: %d (%s)", idx, err)
}
}()
@ -125,7 +124,7 @@ func TestGraveyard_GC_Trigger(t *testing.T) {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
if err := g.InsertTxn(tx, "foo/in/the/house", 2, nil); err != nil {
t.Fatalf("err: %s", err)
}
}()
@ -140,7 +139,7 @@ func TestGraveyard_GC_Trigger(t *testing.T) {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
if err := g.InsertTxn(tx, "foo/in/the/house", 2, nil); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
@ -174,16 +173,16 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
tx := s.db.Txn(true)
defer tx.Abort()
if err := g.InsertTxn(tx, "foo/in/the/house", 2); err != nil {
if err := g.InsertTxn(tx, "foo/in/the/house", 2, nil); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/baz", 5); err != nil {
if err := g.InsertTxn(tx, "foo/bar/baz", 5, nil); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "foo/bar/zoo", 8); err != nil {
if err := g.InsertTxn(tx, "foo/bar/zoo", 8, nil); err != nil {
t.Fatalf("err: %s", err)
}
if err := g.InsertTxn(tx, "some/other/path", 9); err != nil {
if err := g.InsertTxn(tx, "some/other/path", 9, nil); err != nil {
t.Fatalf("err: %s", err)
}
tx.Commit()
@ -217,8 +216,16 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
&Tombstone{Key: "foo/in/the/house", Index: 2},
&Tombstone{Key: "some/other/path", Index: 9},
}
if !reflect.DeepEqual(dump, expected) {
t.Fatalf("bad: %v", dump)
if len(expected) != len(dump) {
t.Fatalf("expected %d, got %d tombstones", len(expected), len(dump))
}
for i, e := range expected {
if e.Key != dump[i].Key {
t.Fatalf("expected key %s, got %s", e.Key, dump[i].Key)
}
if e.Index != dump[i].Index {
t.Fatalf("expected key %s, got %s", e.Key, dump[i].Key)
}
}
// Make another state store and restore from the dump.
@ -255,8 +262,16 @@ func TestGraveyard_Snapshot_Restore(t *testing.T) {
}
return dump
}()
if !reflect.DeepEqual(dump, expected) {
t.Fatalf("bad: %v", dump)
if len(expected) != len(dump) {
t.Fatalf("expected %d, got %d tombstones", len(expected), len(dump))
}
for i, e := range expected {
if e.Key != dump[i].Key {
t.Fatalf("expected key %s, got %s", e.Key, dump[i].Key)
}
if e.Index != dump[i].Index {
t.Fatalf("expected idx %d, got %d", e.Index, dump[i].Index)
}
}
}()
}

View File

@ -2,7 +2,6 @@ package state
import (
"fmt"
"strings"
"time"
"github.com/hashicorp/consul/agent/structs"
@ -19,10 +18,7 @@ func kvsTableSchema() *memdb.TableSchema {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Key",
Lowercase: false,
},
Indexer: kvsIndexer(),
},
"session": &memdb.IndexSchema{
Name: "session",
@ -46,10 +42,7 @@ func tombstonesTableSchema() *memdb.TableSchema {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Key",
Lowercase: false,
},
Indexer: kvsIndexer(),
},
},
}
@ -76,13 +69,10 @@ func (s *Snapshot) Tombstones() (memdb.ResultIterator, error) {
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
func (s *Restore) KVS(entry *structs.DirEntry) error {
if err := s.tx.Insert("kvs", entry); err != nil {
if err := s.store.insertKVTxn(s.tx, entry, true); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, "kvs"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
@ -131,7 +121,7 @@ func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
// whatever the existing session is.
func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
// Retrieve an existing KV pair
existingNode, err := tx.First("kvs", "id", entry.Key)
existingNode, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
@ -161,32 +151,31 @@ func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, up
}
// Store the kv pair in the state store and update the index.
if err := tx.Insert("kvs", entry); err != nil {
if err := s.insertKVTxn(tx, entry, false); err != nil {
return fmt.Errorf("failed inserting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// KVSGet is used to retrieve a key/value pair from the state store.
func (s *Store) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
func (s *Store) KVSGet(ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.kvsGetTxn(tx, ws, key)
return s.kvsGetTxn(tx, ws, key, entMeta)
}
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
// transaction.
func (s *Store) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
func (s *Store) kvsGetTxn(tx *memdb.Txn,
ws memdb.WatchSet, key string, entMeta *structs.EnterpriseMeta) (uint64, *structs.DirEntry, error) {
// Get the table index.
idx := maxIndexTxn(tx, "kvs", "tombstones")
idx := kvsMaxIndex(tx, entMeta)
// Retrieve the key.
watchCh, entry, err := tx.FirstWatch("kvs", "id", key)
watchCh, entry, err := firstWatchWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
@ -201,41 +190,32 @@ func (s *Store) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64,
// prefix is left empty, all keys in the KVS will be returned. The returned
// is the max index of the returned kvs entries or applicable tombstones, or
// else it's the full table indexes for kvs and tombstones.
func (s *Store) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
func (s *Store) KVSList(ws memdb.WatchSet,
prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.kvsListTxn(tx, ws, prefix)
return s.kvsListTxn(tx, ws, prefix, entMeta)
}
// kvsListTxn is the inner method that gets a list of KVS entries matching a
// prefix.
func (s *Store) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
func (s *Store) kvsListTxn(tx *memdb.Txn,
ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
// Query the prefix and list the available keys
entries, err := tx.Get("kvs", "id_prefix", prefix)
// Get the table indexes.
idx := kvsMaxIndex(tx, entMeta)
lindex, entries, err := s.kvsListEntriesTxn(tx, ws, prefix, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
ws.Add(entries.WatchCh())
// Gather all of the keys found in the store
var ents structs.DirEntries
var lindex uint64
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
ents = append(ents, e)
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
}
// Check for the highest index in the graveyard. If the prefix is empty
// then just use the full table indexes since we are listing everything.
if prefix != "" {
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
}
@ -251,92 +231,17 @@ func (s *Store) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uin
if lindex != 0 {
idx = lindex
}
return idx, ents, nil
}
// KVSListKeys is used to query the KV store for keys matching the given prefix.
// An optional separator may be specified, which can be used to slice off a part
// of the response so that only a subset of the prefix is returned. In this
// mode, the keys which are omitted are still counted in the returned index.
func (s *Store) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table indexes.
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Fetch keys using the specified prefix
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
ws.Add(entries.WatchCh())
prefixLen := len(prefix)
sepLen := len(sep)
var keys []string
var lindex uint64
var last string
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
// Accumulate the high index
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
// Always accumulate if no separator provided
if sepLen == 0 {
keys = append(keys, e.Key)
continue
}
// Parse and de-duplicate the returned keys based on the
// key separator, if provided.
after := e.Key[prefixLen:]
sepIdx := strings.Index(after, sep)
if sepIdx > -1 {
key := e.Key[:prefixLen+sepIdx+sepLen]
if key != last {
keys = append(keys, key)
last = key
}
} else {
keys = append(keys, e.Key)
}
}
// Check for the highest index in the graveyard. If the prefix is empty
// then just use the full table indexes since we are listing everything.
if prefix != "" {
gindex, err := s.kvsGraveyard.GetMaxIndexTxn(tx, prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed graveyard lookup: %s", err)
}
if gindex > lindex {
lindex = gindex
}
} else {
lindex = idx
}
// Use the sub index if it was set and there are entries, otherwise use
// the full table index from above.
if lindex != 0 {
idx = lindex
}
return idx, keys, nil
return idx, entries, nil
}
// KVSDelete is used to perform a shallow delete on a single key in the
// the state store.
func (s *Store) KVSDelete(idx uint64, key string) error {
func (s *Store) KVSDelete(idx uint64, key string, entMeta *structs.EnterpriseMeta) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Perform the actual delete
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
if err := s.kvsDeleteTxn(tx, idx, key, entMeta); err != nil {
return err
}
@ -346,9 +251,9 @@ func (s *Store) KVSDelete(idx uint64, key string) error {
// kvsDeleteTxn is the inner method used to perform the actual deletion
// of a key/value pair within an existing transaction.
func (s *Store) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
func (s *Store) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string, entMeta *structs.EnterpriseMeta) error {
// Look up the entry in the state store.
entry, err := tx.First("kvs", "id", key)
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
@ -357,30 +262,22 @@ func (s *Store) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
}
// Create a tombstone.
if err := s.kvsGraveyard.InsertTxn(tx, key, idx); err != nil {
if err := s.kvsGraveyard.InsertTxn(tx, key, idx, entMeta); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
// Delete the entry and update the index.
if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
return s.kvsDeleteWithEntry(tx, entry.(*structs.DirEntry), idx)
}
// KVSDeleteCAS is used to try doing a KV delete operation with a given
// raft index. If the CAS index specified is not equal to the last
// observed index for the given key, then the call is a noop, otherwise
// a normal KV delete is invoked.
func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) {
tx := s.db.Txn(true)
defer tx.Abort()
set, err := s.kvsDeleteCASTxn(tx, idx, cidx, key)
set, err := s.kvsDeleteCASTxn(tx, idx, cidx, key, entMeta)
if !set || err != nil {
return false, err
}
@ -391,9 +288,9 @@ func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
// transaction.
func (s *Store) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) {
func (s *Store) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string, entMeta *structs.EnterpriseMeta) (bool, error) {
// Retrieve the existing kvs entry, if any exists.
entry, err := tx.First("kvs", "id", key)
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
@ -407,7 +304,7 @@ func (s *Store) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bo
}
// Call the actual deletion if the above passed.
if err := s.kvsDeleteTxn(tx, idx, key); err != nil {
if err := s.kvsDeleteTxn(tx, idx, key, entMeta); err != nil {
return false, err
}
return true, nil
@ -434,7 +331,7 @@ func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
// transaction.
func (s *Store) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
@ -462,11 +359,11 @@ func (s *Store) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry)
// KVSDeleteTree is used to do a recursive delete on a key prefix
// in the state store. If any keys are modified, the last index is
// set, otherwise this is a no-op.
func (s *Store) KVSDeleteTree(idx uint64, prefix string) error {
func (s *Store) KVSDeleteTree(idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := s.kvsDeleteTreeTxn(tx, idx, prefix); err != nil {
if err := s.kvsDeleteTreeTxn(tx, idx, prefix, entMeta); err != nil {
return err
}
@ -474,35 +371,10 @@ func (s *Store) KVSDeleteTree(idx uint64, prefix string) error {
return nil
}
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction.
func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
// For prefix deletes, only insert one tombstone and delete the entire subtree
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
if err != nil {
return fmt.Errorf("failed recursive deleting kvs entry: %s", err)
}
if deleted {
if prefix != "" { // don't insert a tombstone if the entire tree is deleted, all watchers on keys will see the max_index of the tree
if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
return nil
}
// KVSLockDelay returns the expiration time for any lock delay associated with
// the given key.
func (s *Store) KVSLockDelay(key string) time.Time {
return s.lockDelay.GetExpiration(key)
func (s *Store) KVSLockDelay(key string, entMeta *structs.EnterpriseMeta) time.Time {
return s.lockDelay.GetExpiration(key, entMeta)
}
// KVSLock is similar to KVSSet but only performs the set if the lock can be
@ -538,7 +410,7 @@ func (s *Store) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (
}
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
@ -595,7 +467,7 @@ func (s *Store) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry)
}
// Retrieve the existing entry.
existing, err := tx.First("kvs", "id", entry.Key)
existing, err := firstWithTxn(tx, "kvs", "id", entry.Key, &entry.EnterpriseMeta)
if err != nil {
return false, fmt.Errorf("failed kvs lookup: %s", err)
}
@ -626,8 +498,10 @@ func (s *Store) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry)
// kvsCheckSessionTxn checks to see if the given session matches the current
// entry for a key.
func (s *Store) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
func (s *Store) kvsCheckSessionTxn(tx *memdb.Txn,
key string, session string, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
}
@ -645,8 +519,10 @@ func (s *Store) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*
// kvsCheckIndexTxn checks to see if the given modify index matches the current
// entry for a key.
func (s *Store) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) {
entry, err := tx.First("kvs", "id", key)
func (s *Store) kvsCheckIndexTxn(tx *memdb.Txn,
key string, cidx uint64, entMeta *structs.EnterpriseMeta) (*structs.DirEntry, error) {
entry, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
return nil, fmt.Errorf("failed kvs lookup: %s", err)
}

View File

@ -0,0 +1,95 @@
// +build !consulent
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)
func kvsIndexer() *memdb.StringFieldIndex {
return &memdb.StringFieldIndex{
Field: "Key",
Lowercase: false,
}
}
func (s *Store) insertKVTxn(tx *memdb.Txn, entry *structs.DirEntry, updateMax bool) error {
if err := tx.Insert("kvs", entry); err != nil {
return err
}
if updateMax {
if err := indexUpdateMaxTxn(tx, entry.ModifyIndex, "kvs"); err != nil {
return fmt.Errorf("failed updating kvs index: %v", err)
}
} else {
if err := tx.Insert("index", &IndexEntry{"kvs", entry.ModifyIndex}); err != nil {
return fmt.Errorf("failed updating kvs index: %s", err)
}
}
return nil
}
func (s *Store) kvsListEntriesTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string, entMeta *structs.EnterpriseMeta) (uint64, structs.DirEntries, error) {
var ents structs.DirEntries
var lindex uint64
entries, err := tx.Get("kvs", "id_prefix", prefix)
if err != nil {
return 0, nil, fmt.Errorf("failed kvs lookup: %s", err)
}
ws.Add(entries.WatchCh())
// Gather all of the keys found
for entry := entries.Next(); entry != nil; entry = entries.Next() {
e := entry.(*structs.DirEntry)
ents = append(ents, e)
if e.ModifyIndex > lindex {
lindex = e.ModifyIndex
}
}
return lindex, ents, nil
}
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
// existing transaction.
func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string, entMeta *structs.EnterpriseMeta) error {
// For prefix deletes, only insert one tombstone and delete the entire subtree
deleted, err := tx.DeletePrefix("kvs", "id_prefix", prefix)
if err != nil {
return fmt.Errorf("failed recursive deleting kvs entry: %s", err)
}
if deleted {
if prefix != "" { // don't insert a tombstone if the entire tree is deleted, all watchers on keys will see the max_index of the tree
if err := s.kvsGraveyard.InsertTxn(tx, prefix, idx, entMeta); err != nil {
return fmt.Errorf("failed adding to graveyard: %s", err)
}
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
return nil
}
func kvsMaxIndex(tx *memdb.Txn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "kvs", "tombstones")
}
func (s *Store) kvsDeleteWithEntry(tx *memdb.Txn, entry *structs.DirEntry, idx uint64) error {
// Delete the entry and update the index.
if err := tx.Delete("kvs", entry); err != nil {
return fmt.Errorf("failed deleting kvs entry: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"kvs", idx}); err != nil {
return fmt.Errorf("failed updating kvs index: %s", err)
}
return nil
}

View File

@ -12,6 +12,74 @@ import (
"github.com/hashicorp/go-memdb"
)
func TestStateStore_ReapTombstones(t *testing.T) {
s := testStateStore(t)
// Create some KV pairs.
testSetKey(t, s, 1, "foo", "foo", nil)
testSetKey(t, s, 2, "foo/bar", "bar", nil)
testSetKey(t, s, 3, "foo/baz", "bar", nil)
testSetKey(t, s, 4, "foo/moo", "bar", nil)
testSetKey(t, s, 5, "foo/zoo", "bar", nil)
// Call a delete on some specific keys.
if err := s.KVSDelete(6, "foo/baz", nil); err != nil {
t.Fatalf("err: %s", err)
}
if err := s.KVSDelete(7, "foo/moo", nil); err != nil {
t.Fatalf("err: %s", err)
}
// Pull out the list and check the index, which should come from the
// tombstones.
idx, _, err := s.KVSList(nil, "foo/", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 7 {
t.Fatalf("bad index: %d", idx)
}
// Reap the tombstones <= 6.
if err := s.ReapTombstones(6); err != nil {
t.Fatalf("err: %s", err)
}
// Should still be good because 7 is in there.
idx, _, err = s.KVSList(nil, "foo/", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 7 {
t.Fatalf("bad index: %d", idx)
}
// Now reap them all.
if err := s.ReapTombstones(7); err != nil {
t.Fatalf("err: %s", err)
}
// At this point the sub index will slide backwards.
idx, _, err = s.KVSList(nil, "foo/", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
// Make sure the tombstones are actually gone.
snap := s.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}
func TestStateStore_GC(t *testing.T) {
// Build up a fast GC.
ttl := 10 * time.Millisecond
@ -29,14 +97,14 @@ func TestStateStore_GC(t *testing.T) {
}
// Create some KV pairs.
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
testSetKey(t, s, 3, "foo/baz", "bar")
testSetKey(t, s, 4, "foo/moo", "bar")
testSetKey(t, s, 5, "foo/zoo", "bar")
testSetKey(t, s, 1, "foo", "foo", nil)
testSetKey(t, s, 2, "foo/bar", "bar", nil)
testSetKey(t, s, 3, "foo/baz", "bar", nil)
testSetKey(t, s, 4, "foo/moo", "bar", nil)
testSetKey(t, s, 5, "foo/zoo", "bar", nil)
// Delete a key and make sure the GC sees it.
if err := s.KVSDelete(6, "foo/zoo"); err != nil {
if err := s.KVSDelete(6, "foo/zoo", nil); err != nil {
t.Fatalf("err: %s", err)
}
select {
@ -49,7 +117,7 @@ func TestStateStore_GC(t *testing.T) {
}
// Check for the same behavior with a tree delete.
if err := s.KVSDeleteTree(7, "foo/moo"); err != nil {
if err := s.KVSDeleteTree(7, "foo/moo", nil); err != nil {
t.Fatalf("err: %s", err)
}
select {
@ -62,7 +130,7 @@ func TestStateStore_GC(t *testing.T) {
}
// Check for the same behavior with a CAS delete.
if ok, err := s.KVSDeleteCAS(8, 3, "foo/baz"); !ok || err != nil {
if ok, err := s.KVSDeleteCAS(8, 3, "foo/baz", nil); !ok || err != nil {
t.Fatalf("err: %s", err)
}
select {
@ -104,80 +172,12 @@ func TestStateStore_GC(t *testing.T) {
}
}
func TestStateStore_ReapTombstones(t *testing.T) {
s := testStateStore(t)
// Create some KV pairs.
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
testSetKey(t, s, 3, "foo/baz", "bar")
testSetKey(t, s, 4, "foo/moo", "bar")
testSetKey(t, s, 5, "foo/zoo", "bar")
// Call a delete on some specific keys.
if err := s.KVSDelete(6, "foo/baz"); err != nil {
t.Fatalf("err: %s", err)
}
if err := s.KVSDelete(7, "foo/moo"); err != nil {
t.Fatalf("err: %s", err)
}
// Pull out the list and check the index, which should come from the
// tombstones.
idx, _, err := s.KVSList(nil, "foo/")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 7 {
t.Fatalf("bad index: %d", idx)
}
// Reap the tombstones <= 6.
if err := s.ReapTombstones(6); err != nil {
t.Fatalf("err: %s", err)
}
// Should still be good because 7 is in there.
idx, _, err = s.KVSList(nil, "foo/")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 7 {
t.Fatalf("bad index: %d", idx)
}
// Now reap them all.
if err := s.ReapTombstones(7); err != nil {
t.Fatalf("err: %s", err)
}
// At this point the sub index will slide backwards.
idx, _, err = s.KVSList(nil, "foo/")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
// Make sure the tombstones are actually gone.
snap := s.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}
func TestStateStore_KVSSet_KVSGet(t *testing.T) {
s := testStateStore(t)
// Get on an nonexistent key returns nil.
ws := memdb.NewWatchSet()
idx, result, err := s.KVSGet(ws, "foo")
idx, result, err := s.KVSGet(ws, "foo", nil)
if result != nil || err != nil || idx != 0 {
t.Fatalf("expected (0, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
}
@ -196,7 +196,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
// Retrieve the K/V entry again.
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
idx, result, err = s.KVSGet(ws, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -231,7 +231,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
// Fetch the kv pair and check.
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
idx, result, err = s.KVSGet(ws, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -260,7 +260,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
// Fetch the kv pair and check.
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
idx, result, err = s.KVSGet(ws, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -298,7 +298,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
// Fetch the kv pair and check.
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
idx, result, err = s.KVSGet(ws, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -330,7 +330,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
// Fetch the kv pair and check.
ws = memdb.NewWatchSet()
idx, result, err = s.KVSGet(ws, "foo")
idx, result, err = s.KVSGet(ws, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -348,14 +348,14 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
}
// Setting some unrelated key should not fire the watch.
testSetKey(t, s, 8, "other", "yup")
testSetKey(t, s, 8, "other", "yup", nil)
if watchFired(ws) {
t.Fatalf("bad")
}
// Fetch a key that doesn't exist and make sure we get the right
// response.
idx, result, err = s.KVSGet(nil, "nope")
idx, result, err = s.KVSGet(nil, "nope", nil)
if result != nil || err != nil || idx != 8 {
t.Fatalf("expected (8, nil, nil), got : (%#v, %#v, %#v)", idx, result, err)
}
@ -370,7 +370,7 @@ func TestStateStore_KVSSet_KVSGet(t *testing.T) {
require.Nil(t, s.KVSSet(1, entry))
require.Nil(t, s.KVSSet(2, entry))
idx, _, err = s.KVSGet(ws, entry.Key)
idx, _, err = s.KVSGet(ws, entry.Key, nil)
require.Nil(t, err)
require.Equal(t, uint64(1), idx)
@ -381,23 +381,23 @@ func TestStateStore_KVSList(t *testing.T) {
// Listing an empty KVS returns nothing
ws := memdb.NewWatchSet()
idx, entries, err := s.KVSList(ws, "")
idx, entries, err := s.KVSList(ws, "", nil)
if idx != 0 || entries != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, entries, err)
}
// Create some KVS entries
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp")
testSetKey(t, s, 5, "foo/bar/baz", "baz")
testSetKey(t, s, 1, "foo", "foo", nil)
testSetKey(t, s, 2, "foo/bar", "bar", nil)
testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp", nil)
testSetKey(t, s, 5, "foo/bar/baz", "baz", nil)
if !watchFired(ws) {
t.Fatalf("bad")
}
// List out all of the keys
idx, entries, err = s.KVSList(nil, "")
idx, entries, err = s.KVSList(nil, "", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -411,7 +411,7 @@ func TestStateStore_KVSList(t *testing.T) {
}
// Try listing with a provided prefix
idx, entries, err = s.KVSList(nil, "foo/bar/zip")
idx, entries, err = s.KVSList(nil, "foo/bar/zip", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -429,18 +429,18 @@ func TestStateStore_KVSList(t *testing.T) {
// Delete a key and make sure the index comes from the tombstone.
ws = memdb.NewWatchSet()
idx, _, err = s.KVSList(ws, "foo/bar/baz")
idx, _, err = s.KVSList(ws, "foo/bar/baz", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if err := s.KVSDelete(6, "foo/bar/baz"); err != nil {
if err := s.KVSDelete(6, "foo/bar/baz", nil); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
idx, _, err = s.KVSList(ws, "foo/bar/baz")
idx, _, err = s.KVSList(ws, "foo/bar/baz", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -450,13 +450,13 @@ func TestStateStore_KVSList(t *testing.T) {
// Set a different key to bump the index. This shouldn't fire the
// watch since there's a different prefix.
testSetKey(t, s, 7, "some/other/key", "")
testSetKey(t, s, 7, "some/other/key", "", nil)
if watchFired(ws) {
t.Fatalf("bad")
}
// Make sure we get the right index from the tombstone.
idx, _, err = s.KVSList(nil, "foo/bar/baz")
idx, _, err = s.KVSList(nil, "foo/bar/baz", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -469,7 +469,7 @@ func TestStateStore_KVSList(t *testing.T) {
if err := s.ReapTombstones(6); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList(nil, "foo/bar/baz")
idx, _, err = s.KVSList(nil, "foo/bar/baz", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -478,7 +478,7 @@ func TestStateStore_KVSList(t *testing.T) {
}
// List all the keys to make sure the index is also correct.
idx, _, err = s.KVSList(nil, "")
idx, _, err = s.KVSList(nil, "", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -487,148 +487,22 @@ func TestStateStore_KVSList(t *testing.T) {
}
}
func TestStateStore_KVSListKeys(t *testing.T) {
s := testStateStore(t)
// Listing keys with no results returns nil.
ws := memdb.NewWatchSet()
idx, keys, err := s.KVSListKeys(ws, "", "")
if idx != 0 || keys != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, keys, err)
}
// Create some keys.
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
testSetKey(t, s, 3, "foo/bar/baz", "baz")
testSetKey(t, s, 4, "foo/bar/zip", "zip")
testSetKey(t, s, 5, "foo/bar/zip/zam", "zam")
testSetKey(t, s, 6, "foo/bar/zip/zorp", "zorp")
testSetKey(t, s, 7, "some/other/prefix", "nack")
if !watchFired(ws) {
t.Fatalf("bad")
}
// List all the keys.
idx, keys, err = s.KVSListKeys(nil, "", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(keys) != 7 {
t.Fatalf("bad keys: %#v", keys)
}
if idx != 7 {
t.Fatalf("bad index: %d", idx)
}
// Query using a prefix and pass a separator.
idx, keys, err = s.KVSListKeys(nil, "foo/bar/", "/")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(keys) != 3 {
t.Fatalf("bad keys: %#v", keys)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
// Subset of the keys was returned.
expect := []string{"foo/bar/baz", "foo/bar/zip", "foo/bar/zip/"}
if !reflect.DeepEqual(keys, expect) {
t.Fatalf("bad keys: %#v", keys)
}
// Listing keys with no separator returns everything.
idx, keys, err = s.KVSListKeys(nil, "foo", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
expect = []string{"foo", "foo/bar", "foo/bar/baz", "foo/bar/zip",
"foo/bar/zip/zam", "foo/bar/zip/zorp"}
if !reflect.DeepEqual(keys, expect) {
t.Fatalf("bad keys: %#v", keys)
}
// Delete a key and make sure the index comes from the tombstone.
ws = memdb.NewWatchSet()
idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if err := s.KVSDelete(8, "foo/bar/baz"); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
ws = memdb.NewWatchSet()
idx, _, err = s.KVSListKeys(ws, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Set a different key to bump the index. This shouldn't fire the watch
// since there's a different prefix.
testSetKey(t, s, 9, "some/other/key", "")
if watchFired(ws) {
t.Fatalf("bad")
}
// Make sure the index still comes from the tombstone.
idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
t.Fatalf("bad index: %d", idx)
}
// Now reap the tombstones and make sure we get the latest index
// since there are no matching keys.
if err := s.ReapTombstones(8); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSListKeys(nil, "foo/bar/baz", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 9 {
t.Fatalf("bad index: %d", idx)
}
// List all the keys to make sure the index is also correct.
idx, _, err = s.KVSListKeys(nil, "", "")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 9 {
t.Fatalf("bad index: %d", idx)
}
}
func TestStateStore_KVSDelete(t *testing.T) {
s := testStateStore(t)
// Create some KV pairs
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
testSetKey(t, s, 1, "foo", "foo", nil)
testSetKey(t, s, 2, "foo/bar", "bar", nil)
// Call a delete on a specific key
if err := s.KVSDelete(3, "foo"); err != nil {
if err := s.KVSDelete(3, "foo", nil); err != nil {
t.Fatalf("err: %s", err)
}
// The entry was removed from the state store
tx := s.db.Txn(false)
defer tx.Abort()
e, err := tx.First("kvs", "id", "foo")
e, err := firstWithTxn(tx, "kvs", "id", "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -637,7 +511,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
}
// Try fetching the other keys to ensure they still exist
e, err = tx.First("kvs", "id", "foo/bar")
e, err = firstWithTxn(tx, "kvs", "id", "foo/bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -652,7 +526,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
// Check that the tombstone was created and that prevents the index
// from sliding backwards.
idx, _, err := s.KVSList(nil, "foo")
idx, _, err := s.KVSList(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -665,7 +539,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
if err := s.ReapTombstones(3); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList(nil, "foo")
idx, _, err = s.KVSList(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -675,7 +549,7 @@ func TestStateStore_KVSDelete(t *testing.T) {
// Deleting a nonexistent key should be idempotent and not return an
// error
if err := s.KVSDelete(4, "foo"); err != nil {
if err := s.KVSDelete(4, "foo", nil); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("kvs"); idx != 3 {
@ -687,19 +561,19 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
s := testStateStore(t)
// Create some KV entries
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "bar", "bar")
testSetKey(t, s, 3, "baz", "baz")
testSetKey(t, s, 1, "foo", "foo", nil)
testSetKey(t, s, 2, "bar", "bar", nil)
testSetKey(t, s, 3, "baz", "baz", nil)
// Do a CAS delete with an index lower than the entry
ok, err := s.KVSDeleteCAS(4, 1, "bar")
ok, err := s.KVSDeleteCAS(4, 1, "bar", nil)
if ok || err != nil {
t.Fatalf("expected (false, nil), got: (%v, %#v)", ok, err)
}
// Check that the index is untouched and the entry
// has not been deleted.
idx, e, err := s.KVSGet(nil, "foo")
idx, e, err := s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -712,13 +586,13 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
// Do another CAS delete, this time with the correct index
// which should cause the delete to take place.
ok, err = s.KVSDeleteCAS(4, 2, "bar")
ok, err = s.KVSDeleteCAS(4, 2, "bar", nil)
if !ok || err != nil {
t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
}
// Entry was deleted and index was updated
idx, e, err = s.KVSGet(nil, "bar")
idx, e, err = s.KVSGet(nil, "bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -730,11 +604,11 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
}
// Add another key to bump the index.
testSetKey(t, s, 5, "some/other/key", "baz")
testSetKey(t, s, 5, "some/other/key", "baz", nil)
// Check that the tombstone was created and that prevents the index
// from sliding backwards.
idx, _, err = s.KVSList(nil, "bar")
idx, _, err = s.KVSList(nil, "bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -747,7 +621,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
if err := s.ReapTombstones(4); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList(nil, "bar")
idx, _, err = s.KVSList(nil, "bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -757,7 +631,7 @@ func TestStateStore_KVSDeleteCAS(t *testing.T) {
// A delete on a nonexistent key should be idempotent and not return an
// error
ok, err = s.KVSDeleteCAS(6, 2, "bar")
ok, err = s.KVSDeleteCAS(6, 2, "bar", nil)
if !ok || err != nil {
t.Fatalf("expected (true, nil), got: (%v, %#v)", ok, err)
}
@ -786,7 +660,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
// Check that nothing was actually stored
tx := s.db.Txn(false)
if e, err := tx.First("kvs", "id", "foo"); e != nil || err != nil {
if e, err := firstWithTxn(tx, "kvs", "id", "foo", nil); e != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", e, err)
}
tx.Abort()
@ -812,7 +686,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was inserted
idx, entry, err := s.KVSGet(nil, "foo")
idx, entry, err := s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -854,7 +728,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was not updated in the store
idx, entry, err = s.KVSGet(nil, "foo")
idx, entry, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -881,7 +755,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was updated
idx, entry, err = s.KVSGet(nil, "foo")
idx, entry, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -908,7 +782,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was updated, but the session should have been ignored.
idx, entry, err = s.KVSGet(nil, "foo")
idx, entry, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -953,7 +827,7 @@ func TestStateStore_KVSSetCAS(t *testing.T) {
}
// Entry was updated, and the lock status should have stayed the same.
idx, entry, err = s.KVSGet(nil, "foo")
idx, entry, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -970,14 +844,14 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
s := testStateStore(t)
// Create kvs entries in the state store.
testSetKey(t, s, 1, "foo/bar", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/zorp", "zorp")
testSetKey(t, s, 1, "foo/bar", "bar", nil)
testSetKey(t, s, 2, "foo/bar/baz", "baz", nil)
testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
testSetKey(t, s, 4, "foo/zorp", "zorp", nil)
// Calling tree deletion which affects nothing does not
// modify the table index.
if err := s.KVSDeleteTree(9, "bar"); err != nil {
if err := s.KVSDeleteTree(9, "bar", nil); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("kvs"); idx != 4 {
@ -985,7 +859,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
}
// Call tree deletion with a nested prefix.
if err := s.KVSDeleteTree(5, "foo/bar"); err != nil {
if err := s.KVSDeleteTree(5, "foo/bar", nil); err != nil {
t.Fatalf("err: %s", err)
}
@ -1017,7 +891,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
// Check that the tombstones ware created and that prevents the index
// from sliding backwards.
idx, _, err := s.KVSList(nil, "foo")
idx, _, err := s.KVSList(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1030,7 +904,7 @@ func TestStateStore_KVSDeleteTree(t *testing.T) {
if err := s.ReapTombstones(5); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList(nil, "foo")
idx, _, err = s.KVSList(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1043,15 +917,15 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) {
s := testStateStore(t)
// Create some KVS entries
testSetKey(t, s, 1, "foo", "foo")
testSetKey(t, s, 2, "foo/bar", "bar")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp")
testSetKey(t, s, 5, "foo/bar/zip/zap", "zap")
testSetKey(t, s, 6, "foo/nope", "nope")
testSetKey(t, s, 1, "foo", "foo", nil)
testSetKey(t, s, 2, "foo/bar", "bar", nil)
testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
testSetKey(t, s, 4, "foo/bar/zip/zorp", "zorp", nil)
testSetKey(t, s, 5, "foo/bar/zip/zap", "zap", nil)
testSetKey(t, s, 6, "foo/nope", "nope", nil)
ws := memdb.NewWatchSet()
got, _, err := s.KVSList(ws, "foo/bar")
got, _, err := s.KVSList(ws, "foo/bar", nil)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
@ -1061,7 +935,7 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) {
}
// Delete a key and make sure the index comes from the tombstone.
if err := s.KVSDeleteTree(7, "foo/bar/zip"); err != nil {
if err := s.KVSDeleteTree(7, "foo/bar/zip", nil); err != nil {
t.Fatalf("unexpected err: %s", err)
}
// Make sure watch fires
@ -1070,7 +944,7 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) {
}
//Verify index matches tombstone
got, _, err = s.KVSList(ws, "foo/bar")
got, _, err = s.KVSList(ws, "foo/bar", nil)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
@ -1088,7 +962,7 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) {
t.Fatalf("err: %s", err)
}
got, _, err = s.KVSList(nil, "foo/bar")
got, _, err = s.KVSList(nil, "foo/bar", nil)
wantIndex = 2
if err != nil {
t.Fatalf("err: %s", err)
@ -1099,13 +973,13 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) {
// Set a different key to bump the index. This shouldn't fire the
// watch since there's a different prefix.
testSetKey(t, s, 8, "some/other/key", "")
testSetKey(t, s, 8, "some/other/key", "", nil)
// Now ask for the index for a node within the prefix that was deleted
// We expect to get the max index in the tree
wantIndex = 8
ws = memdb.NewWatchSet()
got, _, err = s.KVSList(ws, "foo/bar/baz")
got, _, err = s.KVSList(ws, "foo/bar/baz", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1117,7 +991,7 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) {
}
// List all the keys to make sure the index returned is the max index
got, _, err = s.KVSList(nil, "")
got, _, err = s.KVSList(nil, "", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1126,11 +1000,11 @@ func TestStateStore_Watches_PrefixDelete(t *testing.T) {
}
// Delete all the keys, special case where tombstones are not inserted
if err := s.KVSDeleteTree(9, ""); err != nil {
if err := s.KVSDeleteTree(9, "", nil); err != nil {
t.Fatalf("unexpected err: %s", err)
}
wantIndex = 9
got, _, err = s.KVSList(nil, "/foo/bar")
got, _, err = s.KVSList(nil, "/foo/bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1145,7 +1019,7 @@ func TestStateStore_KVSLockDelay(t *testing.T) {
// KVSLockDelay is exercised in the lock/unlock and session invalidation
// cases below, so we just do a basic check on a nonexistent key here.
expires := s.KVSLockDelay("/not/there")
expires := s.KVSLockDelay("/not/there", nil)
if expires.After(time.Now()) {
t.Fatalf("bad: %v", expires)
}
@ -1180,7 +1054,7 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Make sure the indexes got set properly.
idx, result, err := s.KVSGet(nil, "foo")
idx, result, err := s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1201,7 +1075,7 @@ func TestStateStore_KVSLock(t *testing.T) {
// Make sure the indexes got set properly, note that the lock index
// won't go up since we didn't lock it again.
idx, result, err = s.KVSGet(nil, "foo")
idx, result, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1224,7 +1098,7 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Make sure the indexes got set properly.
idx, result, err = s.KVSGet(nil, "foo")
idx, result, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1237,14 +1111,14 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Lock an existing key.
testSetKey(t, s, 8, "bar", "bar")
testSetKey(t, s, 8, "bar", "bar", nil)
ok, err = s.KVSLock(9, &structs.DirEntry{Key: "bar", Value: []byte("xxx"), Session: session1})
if !ok || err != nil {
t.Fatalf("didn't get the lock: %v %s", ok, err)
}
// Make sure the indexes got set properly.
idx, result, err = s.KVSGet(nil, "bar")
idx, result, err = s.KVSGet(nil, "bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1270,7 +1144,7 @@ func TestStateStore_KVSLock(t *testing.T) {
}
// Make sure the indexes didn't update.
idx, result, err = s.KVSGet(nil, "bar")
idx, result, err = s.KVSGet(nil, "bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1307,14 +1181,14 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make a key and unlock it, without it being locked.
testSetKey(t, s, 4, "foo", "bar")
testSetKey(t, s, 4, "foo", "bar", nil)
ok, err = s.KVSUnlock(5, &structs.DirEntry{Key: "foo", Value: []byte("baz"), Session: session1})
if ok || err != nil {
t.Fatalf("didn't handle unlocking a non-locked key: %v %s", ok, err)
}
// Make sure the indexes didn't update.
idx, result, err := s.KVSGet(nil, "foo")
idx, result, err := s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1343,7 +1217,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make sure the indexes didn't update.
idx, result, err = s.KVSGet(nil, "foo")
idx, result, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1362,7 +1236,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make sure the indexes got set properly.
idx, result, err = s.KVSGet(nil, "foo")
idx, result, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1381,7 +1255,7 @@ func TestStateStore_KVSUnlock(t *testing.T) {
}
// Make sure the indexes didn't update.
idx, result, err = s.KVSGet(nil, "foo")
idx, result, err = s.KVSGet(nil, "foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1474,7 +1348,7 @@ func TestStateStore_KVS_Snapshot_Restore(t *testing.T) {
restore.Commit()
// Read the restored keys back out and verify they match.
idx, res, err := s.KVSList(nil, "")
idx, res, err := s.KVSList(nil, "", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1496,10 +1370,10 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
// Insert a key and then delete it to create a tombstone.
testSetKey(t, s, 1, "foo/bar", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "bar")
testSetKey(t, s, 3, "foo/bar/zoo", "bar")
if err := s.KVSDelete(4, "foo/bar"); err != nil {
testSetKey(t, s, 1, "foo/bar", "bar", nil)
testSetKey(t, s, 2, "foo/bar/baz", "bar", nil)
testSetKey(t, s, 3, "foo/bar/zoo", "bar", nil)
if err := s.KVSDelete(4, "foo/bar", nil); err != nil {
t.Fatalf("err: %s", err)
}
@ -1511,7 +1385,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
if err := s.ReapTombstones(4); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err := s.KVSList(nil, "foo/bar")
idx, _, err := s.KVSList(nil, "foo/bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1548,7 +1422,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
restore.Commit()
// See if the stone works properly in a list query.
idx, _, err := s.KVSList(nil, "foo/bar")
idx, _, err := s.KVSList(nil, "foo/bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1562,7 +1436,7 @@ func TestStateStore_Tombstone_Snapshot_Restore(t *testing.T) {
if err := s.ReapTombstones(4); err != nil {
t.Fatalf("err: %s", err)
}
idx, _, err = s.KVSList(nil, "foo/bar")
idx, _, err = s.KVSList(nil, "foo/bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -25,10 +25,7 @@ func sessionsTableSchema() *memdb.TableSchema {
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
Indexer: nodeSessionsIndexer(),
},
},
}
@ -250,13 +247,10 @@ func (s *Store) NodeSessions(ws memdb.WatchSet, nodeID string, entMeta *structs.
idx := s.sessionMaxIndex(tx, entMeta)
// Get all of the sessions which belong to the node
sessions, err := tx.Get("sessions", "node", nodeID)
result, err := s.nodeSessionsTxn(tx, ws, nodeID, entMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
return 0, nil, err
}
ws.Add(sessions.WatchCh())
result := s.collectNodeSessions(sessions, entMeta)
return idx, result, nil
}
@ -329,19 +323,19 @@ func (s *Store) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string, en
// Apply the lock delay if present.
if delay > 0 {
s.lockDelay.SetExpiration(e.Key, now, delay)
s.lockDelay.SetExpiration(e.Key, now, delay, entMeta)
}
}
case structs.SessionKeysDelete:
for _, obj := range kvs {
e := obj.(*structs.DirEntry)
if err := s.kvsDeleteTxn(tx, idx, e.Key); err != nil {
if err := s.kvsDeleteTxn(tx, idx, e.Key, entMeta); err != nil {
return fmt.Errorf("failed kvs delete: %s", err)
}
// Apply the lock delay if present.
if delay > 0 {
s.lockDelay.SetExpiration(e.Key, now, delay)
s.lockDelay.SetExpiration(e.Key, now, delay, entMeta)
}
}
default:

View File

@ -15,13 +15,11 @@ func sessionIndexer() *memdb.UUIDFieldIndex {
}
}
func (s *Store) collectNodeSessions(sessions memdb.ResultIterator, entMeta *structs.EnterpriseMeta) structs.Sessions {
// Go over all of the sessions and return them as a slice
var result structs.Sessions
for s := sessions.Next(); s != nil; s = sessions.Next() {
result = append(result, s.(*structs.Session))
func nodeSessionsIndexer() *memdb.StringFieldIndex {
return &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
}
return result
}
func (s *Store) sessionDeleteWithSession(tx *memdb.Txn, session *structs.Session, idx uint64) error {
@ -69,6 +67,26 @@ func (s *Store) insertSessionTxn(tx *memdb.Txn, session *structs.Session, idx ui
return nil
}
func (s *Store) allNodeSessionsTxn(tx *memdb.Txn, node string) (structs.Sessions, error) {
return s.nodeSessionsTxn(tx, nil, node, nil)
}
func (s *Store) nodeSessionsTxn(tx *memdb.Txn,
ws memdb.WatchSet, node string, entMeta *structs.EnterpriseMeta) (structs.Sessions, error) {
sessions, err := tx.Get("sessions", "node", node)
if err != nil {
return nil, fmt.Errorf("failed session lookup: %s", err)
}
ws.Add(sessions.WatchCh())
var result structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
result = append(result, session.(*structs.Session))
}
return result, nil
}
func (s *Store) sessionMaxIndex(tx *memdb.Txn, entMeta *structs.EnterpriseMeta) uint64 {
return maxIndexTxn(tx, "sessions")
}

View File

@ -778,7 +778,7 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
}
// Key should be unlocked.
idx, d2, err := s.KVSGet(nil, "/foo")
idx, d2, err := s.KVSGet(nil, "/foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -796,7 +796,7 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
}
// Key should have a lock delay.
expires := s.KVSLockDelay("/foo")
expires := s.KVSLockDelay("/foo", nil)
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
t.Fatalf("Bad: %v", expires)
}
@ -860,7 +860,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
}
// Key should be deleted.
idx, d2, err := s.KVSGet(nil, "/bar")
idx, d2, err := s.KVSGet(nil, "/bar", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -872,7 +872,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
}
// Key should have a lock delay.
expires := s.KVSLockDelay("/bar")
expires := s.KVSLockDelay("/bar", nil)
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
t.Fatalf("Bad: %v", expires)
}

View File

@ -3,9 +3,8 @@ package state
import (
"errors"
"fmt"
"github.com/hashicorp/consul/types"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
)
var (

View File

@ -178,15 +178,22 @@ func testRegisterConnectNativeService(t *testing.T, s *Store, idx uint64, nodeID
require.NoError(t, s.EnsureService(idx, nodeID, svc))
}
func testSetKey(t *testing.T, s *Store, idx uint64, key, value string) {
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
func testSetKey(t *testing.T, s *Store, idx uint64, key, value string, entMeta *structs.EnterpriseMeta) {
entry := &structs.DirEntry{
Key: key,
Value: []byte(value),
}
if entMeta != nil {
entry.EnterpriseMeta = *entMeta
}
if err := s.KVSSet(idx, entry); err != nil {
t.Fatalf("err: %s", err)
}
tx := s.db.Txn(false)
defer tx.Abort()
e, err := tx.First("kvs", "id", key)
e, err := firstWithTxn(tx, "kvs", "id", key, entMeta)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -223,7 +230,7 @@ func TestStateStore_Restore_Abort(t *testing.T) {
}
restore.Abort()
idx, entries, err := s.KVSList(nil, "")
idx, entries, err := s.KVSList(nil, "", nil)
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -19,17 +19,17 @@ func (s *Store) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.
err = s.kvsSetTxn(tx, idx, entry, false)
case api.KVDelete:
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key)
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
case api.KVDeleteCAS:
var ok bool
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key)
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
if !ok && err == nil {
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
}
case api.KVDeleteTree:
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key)
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
case api.KVCAS:
var ok bool
@ -56,14 +56,14 @@ func (s *Store) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.
}
case api.KVGet:
_, entry, err = s.kvsGetTxn(tx, nil, op.DirEnt.Key)
_, entry, err = s.kvsGetTxn(tx, nil, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
if entry == nil && err == nil {
err = fmt.Errorf("key %q doesn't exist", op.DirEnt.Key)
}
case api.KVGetTree:
var entries structs.DirEntries
_, entries, err = s.kvsListTxn(tx, nil, op.DirEnt.Key)
_, entries, err = s.kvsListTxn(tx, nil, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
if err == nil {
results := make(structs.TxnResults, 0, len(entries))
for _, e := range entries {
@ -74,13 +74,13 @@ func (s *Store) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.
}
case api.KVCheckSession:
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session)
entry, err = s.kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session, &op.DirEnt.EnterpriseMeta)
case api.KVCheckIndex:
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex)
entry, err = s.kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex, &op.DirEnt.EnterpriseMeta)
case api.KVCheckNotExists:
_, entry, err = s.kvsGetTxn(tx, nil, op.DirEnt.Key)
_, entry, err = s.kvsGetTxn(tx, nil, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
if entry != nil && err == nil {
err = fmt.Errorf("key %q exists", op.DirEnt.Key)
}
@ -110,6 +110,23 @@ func (s *Store) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.
return nil, nil
}
// txnSession handles all Session-related operations.
func (s *Store) txnSession(tx *memdb.Txn, idx uint64, op *structs.TxnSessionOp) error {
var err error
switch op.Verb {
case api.SessionDelete:
err = s.sessionDeleteWithSession(tx, &op.Session, idx)
default:
err = fmt.Errorf("unknown Session verb %q", op.Verb)
}
if err != nil {
return fmt.Errorf("failed to delete session: %v", err)
}
return nil
}
// txnIntention handles all Intention-related operations.
func (s *Store) txnIntention(tx *memdb.Txn, idx uint64, op *structs.TxnIntentionOp) error {
switch op.Op {
@ -332,6 +349,8 @@ func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (stru
ret, err = s.txnService(tx, idx, op.Service)
case op.Check != nil:
ret, err = s.txnCheck(tx, idx, op.Check)
case op.Session != nil:
err = s.txnSession(tx, idx, op.Session)
default:
err = fmt.Errorf("no operation specified")
}

View File

@ -2,7 +2,6 @@ package state
import (
"fmt"
"reflect"
"strings"
"testing"
@ -500,11 +499,11 @@ func TestStateStore_Txn_KVS(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 4, "foo/zorp", "zorp")
testSetKey(t, s, 5, "foo/update", "stale")
testSetKey(t, s, 1, "foo/delete", "bar", nil)
testSetKey(t, s, 2, "foo/bar/baz", "baz", nil)
testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
testSetKey(t, s, 4, "foo/zorp", "zorp", nil)
testSetKey(t, s, 5, "foo/update", "stale", nil)
// Make a real session.
testRegisterNode(t, s, 6, "node1")
@ -776,14 +775,23 @@ func TestStateStore_Txn_KVS(t *testing.T) {
if len(results) != len(expected) {
t.Fatalf("bad: %v", results)
}
for i := range results {
if !reflect.DeepEqual(results[i], expected[i]) {
t.Fatalf("bad %d", i)
for i, e := range expected {
if e.KV.Key != results[i].KV.Key {
t.Fatalf("expected key %s, got %s", e.KV.Key, results[i].KV.Key)
}
if e.KV.LockIndex != results[i].KV.LockIndex {
t.Fatalf("expected lock index %d, got %d", e.KV.LockIndex, results[i].KV.LockIndex)
}
if e.KV.CreateIndex != results[i].KV.CreateIndex {
t.Fatalf("expected create index %d, got %d", e.KV.CreateIndex, results[i].KV.CreateIndex)
}
if e.KV.ModifyIndex != results[i].KV.ModifyIndex {
t.Fatalf("expected modify index %d, got %d", e.KV.ModifyIndex, results[i].KV.ModifyIndex)
}
}
// Pull the resulting state store contents.
idx, actual, err := s.KVSList(nil, "")
idx, actual, err := s.KVSList(nil, "", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -821,9 +829,21 @@ func TestStateStore_Txn_KVS(t *testing.T) {
if len(actual) != len(entries) {
t.Fatalf("bad len: %d != %d", len(actual), len(entries))
}
for i := range actual {
if !reflect.DeepEqual(actual[i], entries[i]) {
t.Fatalf("bad %d", i)
for i, e := range entries {
if e.Key != actual[i].Key {
t.Fatalf("expected key %s, got %s", e.Key, actual[i].Key)
}
if string(e.Value) != string(actual[i].Value) {
t.Fatalf("expected value %s, got %s", e.Value, actual[i].Value)
}
if e.LockIndex != actual[i].LockIndex {
t.Fatalf("expected lock index %d, got %d", e.LockIndex, actual[i].LockIndex)
}
if e.CreateIndex != actual[i].CreateIndex {
t.Fatalf("expected create index %d, got %d", e.CreateIndex, actual[i].CreateIndex)
}
if e.ModifyIndex != actual[i].ModifyIndex {
t.Fatalf("expected modify index %d, got %d", e.ModifyIndex, actual[i].ModifyIndex)
}
}
}
@ -832,8 +852,8 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo/delete", "bar")
testSetKey(t, s, 2, "foo/update", "stale")
testSetKey(t, s, 1, "foo/delete", "bar", nil)
testSetKey(t, s, 2, "foo/update", "stale", nil)
testRegisterNode(t, s, 3, "node1")
session := testUUID()
@ -852,7 +872,7 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
// This function verifies that the state store wasn't changed.
verifyStateStore := func(desc string) {
idx, actual, err := s.KVSList(nil, "")
idx, actual, err := s.KVSList(nil, "", nil)
if err != nil {
t.Fatalf("err (%s): %s", desc, err)
}
@ -892,9 +912,21 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
if len(actual) != len(entries) {
t.Fatalf("bad len (%s): %d != %d", desc, len(actual), len(entries))
}
for i := range actual {
if !reflect.DeepEqual(actual[i], entries[i]) {
t.Fatalf("bad (%s): op %d: %v != %v", desc, i, *(actual[i]), *(entries[i]))
for i, e := range entries {
if e.Key != actual[i].Key {
t.Fatalf("expected key %s, got %s", e.Key, actual[i].Key)
}
if string(e.Value) != string(actual[i].Value) {
t.Fatalf("expected value %s, got %s", e.Value, actual[i].Value)
}
if e.LockIndex != actual[i].LockIndex {
t.Fatalf("expected lock index %d, got %d", e.LockIndex, actual[i].LockIndex)
}
if e.CreateIndex != actual[i].CreateIndex {
t.Fatalf("expected create index %d, got %d", e.CreateIndex, actual[i].CreateIndex)
}
if e.ModifyIndex != actual[i].ModifyIndex {
t.Fatalf("expected modify index %d, got %d", e.ModifyIndex, actual[i].ModifyIndex)
}
}
}
@ -1027,9 +1059,9 @@ func TestStateStore_Txn_KVS_RO(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 1, "foo", "bar", nil)
testSetKey(t, s, 2, "foo/bar/baz", "baz", nil)
testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
// Set up a transaction that hits all the read-only operations.
ops := structs.TxnOps{
@ -1129,9 +1161,18 @@ func TestStateStore_Txn_KVS_RO(t *testing.T) {
if len(results) != len(expected) {
t.Fatalf("bad: %v", results)
}
for i := range results {
if !reflect.DeepEqual(results[i], expected[i]) {
t.Fatalf("bad %d", i)
for i, e := range expected {
if e.KV.Key != results[i].KV.Key {
t.Fatalf("expected key %s, got %s", e.KV.Key, results[i].KV.Key)
}
if e.KV.LockIndex != results[i].KV.LockIndex {
t.Fatalf("expected lock index %d, got %d", e.KV.LockIndex, results[i].KV.LockIndex)
}
if e.KV.CreateIndex != results[i].KV.CreateIndex {
t.Fatalf("expected create index %d, got %d", e.KV.CreateIndex, results[i].KV.CreateIndex)
}
if e.KV.ModifyIndex != results[i].KV.ModifyIndex {
t.Fatalf("expected modify index %d, got %d", e.KV.ModifyIndex, results[i].KV.ModifyIndex)
}
}
}
@ -1140,9 +1181,9 @@ func TestStateStore_Txn_KVS_RO_Safety(t *testing.T) {
s := testStateStore(t)
// Create KV entries in the state store.
testSetKey(t, s, 1, "foo", "bar")
testSetKey(t, s, 2, "foo/bar/baz", "baz")
testSetKey(t, s, 3, "foo/bar/zip", "zip")
testSetKey(t, s, 1, "foo", "bar", nil)
testSetKey(t, s, 2, "foo/bar/baz", "baz", nil)
testSetKey(t, s, 3, "foo/bar/zip", "zip", nil)
// Set up a transaction that hits all the read-only operations.
ops := structs.TxnOps{

View File

@ -14,7 +14,6 @@ import (
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/require"
)
@ -214,7 +213,7 @@ func TestTxn_Apply(t *testing.T) {
// Verify the state store directly.
state := s1.fsm.State()
_, d, err := state.KVSGet(nil, "test")
_, d, err := state.KVSGet(nil, "test", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -262,6 +261,7 @@ func TestTxn_Apply(t *testing.T) {
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
EnterpriseMeta: d.EnterpriseMeta,
},
},
&structs.TxnResult{
@ -273,6 +273,7 @@ func TestTxn_Apply(t *testing.T) {
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
EnterpriseMeta: d.EnterpriseMeta,
},
},
&structs.TxnResult{
@ -295,7 +296,7 @@ func TestTxn_Apply(t *testing.T) {
},
},
}
verify.Values(t, "", out, expected)
require.Equal(t, expected, out)
}
func TestTxn_Apply_ACLDeny(t *testing.T) {
@ -609,7 +610,7 @@ func TestTxn_Apply_ACLDeny(t *testing.T) {
}
}
verify.Values(t, "", out, expected)
require.Equal(expected, out)
}
func TestTxn_Apply_LockDelay(t *testing.T) {
@ -778,6 +779,8 @@ func TestTxn_Read(t *testing.T) {
// Verify the transaction's return value.
svc.Weights = &structs.Weights{Passing: 1, Warning: 1}
svc.RaftIndex = structs.RaftIndex{CreateIndex: 3, ModifyIndex: 3}
entMeta := out.Results[0].KV.EnterpriseMeta
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
@ -789,6 +792,7 @@ func TestTxn_Read(t *testing.T) {
CreateIndex: 1,
ModifyIndex: 1,
},
EnterpriseMeta: entMeta,
},
},
&structs.TxnResult{
@ -806,7 +810,7 @@ func TestTxn_Read(t *testing.T) {
KnownLeader: true,
},
}
verify.Values(t, "", out, expected)
require.Equal(expected, out)
}
func TestTxn_Read_ACLDeny(t *testing.T) {

View File

@ -18,6 +18,7 @@ func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (i
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
s.parseEntMeta(req, &args.EnterpriseMeta)
// Pull out the key name, validation left to each sub-handler
args.Key = strings.TrimPrefix(req.URL.Path, "/v1/kv/")
@ -96,10 +97,11 @@ func (s *HTTPServer) KVSGetKeys(resp http.ResponseWriter, req *http.Request, arg
// Construct the args
listArgs := structs.KeyListRequest{
Datacenter: args.Datacenter,
Prefix: args.Key,
Seperator: sep,
QueryOptions: args.QueryOptions,
Datacenter: args.Datacenter,
Prefix: args.Key,
Seperator: sep,
EnterpriseMeta: args.EnterpriseMeta,
QueryOptions: args.QueryOptions,
}
// Make the RPC
@ -135,9 +137,10 @@ func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *s
Datacenter: args.Datacenter,
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: args.Key,
Flags: 0,
Value: nil,
Key: args.Key,
Flags: 0,
Value: nil,
EnterpriseMeta: args.EnterpriseMeta,
},
}
applyReq.Token = args.Token
@ -210,7 +213,8 @@ func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args
Datacenter: args.Datacenter,
Op: api.KVDelete,
DirEnt: structs.DirEntry{
Key: args.Key,
Key: args.Key,
EnterpriseMeta: args.EnterpriseMeta,
},
}
applyReq.Token = args.Token

View File

@ -1637,6 +1637,7 @@ func (d *DirEntry) Clone() *DirEntry {
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
EnterpriseMeta: d.EnterpriseMeta,
}
}
@ -1680,6 +1681,7 @@ type KeyListRequest struct {
Prefix string
Seperator string
QueryOptions
EnterpriseMeta
}
func (r *KeyListRequest) RequestDatacenter() string {

View File

@ -49,10 +49,17 @@ type TxnCheckOp struct {
Check HealthCheck
}
// TxnCheckResult is used to define the result of a single operation on a health
// check inside a transaction.
// TxnCheckResult is used to define the result of a single operation on a
// session inside a transaction.
type TxnCheckResult *HealthCheck
// TxnSessionOp is used to define a single operation on a session inside a
// transaction.
type TxnSessionOp struct {
Verb api.SessionOp
Session Session
}
// TxnKVOp is used to define a single operation on an Intention inside a
// transaction.
type TxnIntentionOp IntentionRequest
@ -65,6 +72,7 @@ type TxnOp struct {
Node *TxnNodeOp
Service *TxnServiceOp
Check *TxnCheckOp
Session *TxnSessionOp
}
// TxnOps is a list of operations within a transaction.

View File

@ -6,18 +6,16 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/raft"
"github.com/pascaldekloe/goe/verify"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/assert"
)
func TestTxnEndpoint_Bad_JSON(t *testing.T) {
@ -213,7 +211,10 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
if len(txnResp.Results) != 2 {
t.Fatalf("bad: %v", txnResp)
}
index = txnResp.Results[0].KV.ModifyIndex
entMeta := txnResp.Results[0].KV.EnterpriseMeta
expected := structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
@ -227,6 +228,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
CreateIndex: index,
ModifyIndex: index,
},
EnterpriseMeta: entMeta,
},
},
&structs.TxnResult{
@ -240,13 +242,12 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
CreateIndex: index,
ModifyIndex: index,
},
EnterpriseMeta: entMeta,
},
},
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
assert.Equal(t, expected, txnResp)
}
// Do a read-only transaction that should get routed to the
@ -291,6 +292,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
if !ok {
t.Fatalf("bad type: %T", obj)
}
entMeta := txnResp.Results[0].KV.EnterpriseMeta
expected := structs.TxnReadResponse{
TxnResponse: structs.TxnResponse{
Results: structs.TxnResults{
@ -305,6 +307,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
CreateIndex: index,
ModifyIndex: index,
},
EnterpriseMeta: entMeta,
},
},
&structs.TxnResult{
@ -318,6 +321,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
CreateIndex: index,
ModifyIndex: index,
},
EnterpriseMeta: entMeta,
},
},
},
@ -326,9 +330,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
KnownLeader: true,
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
assert.Equal(t, expected, txnResp)
}
// Now that we have an index we can do a CAS to make sure the
@ -369,7 +371,10 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
if len(txnResp.Results) != 2 {
t.Fatalf("bad: %v", txnResp)
}
modIndex := txnResp.Results[0].KV.ModifyIndex
entMeta := txnResp.Results[0].KV.EnterpriseMeta
expected := structs.TxnResponse{
Results: structs.TxnResults{
&structs.TxnResult{
@ -381,6 +386,7 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
CreateIndex: index,
ModifyIndex: modIndex,
},
EnterpriseMeta: entMeta,
},
},
&structs.TxnResult{
@ -392,13 +398,12 @@ func TestTxnEndpoint_KV_Actions(t *testing.T) {
CreateIndex: index,
ModifyIndex: modIndex,
},
EnterpriseMeta: entMeta,
},
},
},
}
if !reflect.DeepEqual(txnResp, expected) {
t.Fatalf("bad: %v", txnResp)
}
assert.Equal(t, expected, txnResp)
}
})
@ -601,7 +606,7 @@ func TestTxnEndpoint_UpdateCheck(t *testing.T) {
},
},
}
verify.Values(t, "", txnResp, expected)
assert.Equal(t, expected, txnResp)
}
func TestConvertOps_ContentLength(t *testing.T) {

View File

@ -300,6 +300,10 @@ type Config struct {
// If provided it is read once at startup and never again.
TokenFile string
// Namespace is the name of the namespace to send along for the request
// when no other Namespace ispresent in the QueryOptions
Namespace string
TLSConfig TLSConfig
}
@ -801,6 +805,9 @@ func (c *Client) newRequest(method, path string) *request {
if c.config.Datacenter != "" {
r.params.Set("dc", c.config.Datacenter)
}
if c.config.Namespace != "" {
r.params.Set("ns", c.config.Namespace)
}
if c.config.WaitTime != 0 {
r.params.Set("wait", durToMsec(r.config.WaitTime))
}

View File

@ -40,6 +40,10 @@ type KVPair struct {
// interactions with this key over the same session must specify the same
// session ID.
Session string
// Namespace is the namespace the KVPair is associated with
// Namespacing is a Consul Enterprise feature.
Namespace string
}
// KVPairs is a list of KVPair objects

View File

@ -79,6 +79,7 @@ type LockOptions struct {
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
LockWaitTime time.Duration // Optional, defaults to DefaultLockWaitTime
LockTryOnce bool // Optional, defaults to false which means try forever
Namespace string // Optional, defaults to API client config, namespace of ACL token, or "default" namespace
}
// LockKey returns a handle to a lock struct which can be used
@ -140,6 +141,10 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
return nil, ErrLockHeld
}
wOpts := WriteOptions{
Namespace: l.opts.Namespace,
}
// Check if we need to create a session first
l.lockSession = l.opts.Session
if l.lockSession == "" {
@ -150,8 +155,9 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
l.sessionRenew = make(chan struct{})
l.lockSession = s
session := l.c.Session()
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
go session.RenewPeriodic(l.opts.SessionTTL, s, &wOpts, l.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
@ -164,8 +170,9 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Setup the query options
kv := l.c.KV()
qOpts := &QueryOptions{
qOpts := QueryOptions{
WaitTime: l.opts.LockWaitTime,
Namespace: l.opts.Namespace,
}
start := time.Now()
@ -191,7 +198,7 @@ WAIT:
attempts++
// Look for an existing lock, blocking until not taken
pair, meta, err := kv.Get(l.opts.Key, qOpts)
pair, meta, err := kv.Get(l.opts.Key, &qOpts)
if err != nil {
return nil, fmt.Errorf("failed to read lock: %v", err)
}
@ -209,7 +216,8 @@ WAIT:
// Try to acquire the lock
pair = l.lockEntry(l.lockSession)
locked, _, err = kv.Acquire(pair, nil)
locked, _, err = kv.Acquire(pair, &wOpts)
if err != nil {
return nil, fmt.Errorf("failed to acquire lock: %v", err)
}
@ -218,7 +226,7 @@ WAIT:
if !locked {
// Determine why the lock failed
qOpts.WaitIndex = 0
pair, meta, err = kv.Get(l.opts.Key, qOpts)
pair, meta, err = kv.Get(l.opts.Key, &qOpts)
if pair != nil && pair.Session != "" {
//If the session is not null, this means that a wait can safely happen
//using a long poll
@ -277,7 +285,9 @@ func (l *Lock) Unlock() error {
// Release the lock explicitly
kv := l.c.KV()
_, _, err := kv.Release(lockEnt, nil)
w := WriteOptions{Namespace: l.opts.Namespace}
_, _, err := kv.Release(lockEnt, &w)
if err != nil {
return fmt.Errorf("failed to release lock: %v", err)
}
@ -298,7 +308,9 @@ func (l *Lock) Destroy() error {
// Look for an existing lock
kv := l.c.KV()
pair, _, err := kv.Get(l.opts.Key, nil)
q := QueryOptions{Namespace: l.opts.Namespace}
pair, _, err := kv.Get(l.opts.Key, &q)
if err != nil {
return fmt.Errorf("failed to read lock: %v", err)
}
@ -319,7 +331,8 @@ func (l *Lock) Destroy() error {
}
// Attempt the delete
didRemove, _, err := kv.DeleteCAS(pair, nil)
w := WriteOptions{Namespace: l.opts.Namespace}
didRemove, _, err := kv.DeleteCAS(pair, &w)
if err != nil {
return fmt.Errorf("failed to remove lock: %v", err)
}
@ -339,7 +352,8 @@ func (l *Lock) createSession() (string, error) {
TTL: l.opts.SessionTTL,
}
}
id, _, err := session.Create(se, nil)
w := WriteOptions{Namespace: l.opts.Namespace}
id, _, err := session.Create(se, &w)
if err != nil {
return "", err
}
@ -361,11 +375,14 @@ func (l *Lock) lockEntry(session string) *KVPair {
func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
defer close(stopCh)
kv := l.c.KV()
opts := &QueryOptions{RequireConsistent: true}
opts := QueryOptions{
RequireConsistent: true,
Namespace: l.opts.Namespace,
}
WAIT:
retries := l.opts.MonitorRetries
RETRY:
pair, meta, err := kv.Get(l.opts.Key, opts)
pair, meta, err := kv.Get(l.opts.Key, &opts)
if err != nil {
// If configured we can try to ride out a brief Consul unavailability
// by doing retries. Note that we have to attempt the retry in a non-

View File

@ -73,6 +73,7 @@ type SemaphoreOptions struct {
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime
SemaphoreTryOnce bool // Optional, defaults to false which means try forever
Namespace string // Optional, defaults to API client config, namespace of ACL token, or "default" namespace
}
// semaphoreLock is written under the DefaultSemaphoreKey and
@ -176,14 +177,17 @@ func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Create the contender entry
kv := s.c.KV()
made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), nil)
wOpts := WriteOptions{Namespace: s.opts.Namespace}
made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), &wOpts)
if err != nil || !made {
return nil, fmt.Errorf("failed to make contender entry: %v", err)
}
// Setup the query options
qOpts := &QueryOptions{
qOpts := QueryOptions{
WaitTime: s.opts.SemaphoreWaitTime,
Namespace: s.opts.Namespace,
}
start := time.Now()
@ -209,7 +213,7 @@ WAIT:
attempts++
// Read the prefix
pairs, meta, err := kv.List(s.opts.Prefix, qOpts)
pairs, meta, err := kv.List(s.opts.Prefix, &qOpts)
if err != nil {
return nil, fmt.Errorf("failed to read prefix: %v", err)
}
@ -247,7 +251,7 @@ WAIT:
}
// Attempt the acquisition
didSet, _, err := kv.CAS(newLock, nil)
didSet, _, err := kv.CAS(newLock, &wOpts)
if err != nil {
return nil, fmt.Errorf("failed to update lock: %v", err)
}
@ -298,8 +302,12 @@ func (s *Semaphore) Release() error {
// Remove ourselves as a lock holder
kv := s.c.KV()
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
wOpts := WriteOptions{Namespace: s.opts.Namespace}
qOpts := QueryOptions{Namespace: s.opts.Namespace}
READ:
pair, _, err := kv.Get(key, nil)
pair, _, err := kv.Get(key, &qOpts)
if err != nil {
return err
}
@ -320,7 +328,7 @@ READ:
}
// Swap the locks
didSet, _, err := kv.CAS(newLock, nil)
didSet, _, err := kv.CAS(newLock, &wOpts)
if err != nil {
return fmt.Errorf("failed to update lock: %v", err)
}
@ -331,7 +339,7 @@ READ:
// Destroy the contender entry
contenderKey := path.Join(s.opts.Prefix, lockSession)
if _, err := kv.Delete(contenderKey, nil); err != nil {
if _, err := kv.Delete(contenderKey, &wOpts); err != nil {
return err
}
return nil
@ -351,7 +359,9 @@ func (s *Semaphore) Destroy() error {
// List for the semaphore
kv := s.c.KV()
pairs, _, err := kv.List(s.opts.Prefix, nil)
q := QueryOptions{Namespace: s.opts.Namespace}
pairs, _, err := kv.List(s.opts.Prefix, &q)
if err != nil {
return fmt.Errorf("failed to read prefix: %v", err)
}
@ -380,7 +390,8 @@ func (s *Semaphore) Destroy() error {
}
// Attempt the delete
didRemove, _, err := kv.DeleteCAS(lockPair, nil)
w := WriteOptions{Namespace: s.opts.Namespace}
didRemove, _, err := kv.DeleteCAS(lockPair, &w)
if err != nil {
return fmt.Errorf("failed to remove semaphore: %v", err)
}
@ -398,7 +409,9 @@ func (s *Semaphore) createSession() (string, error) {
TTL: s.opts.SessionTTL,
Behavior: SessionBehaviorDelete,
}
id, _, err := session.Create(se, nil)
w := WriteOptions{Namespace: s.opts.Namespace}
id, _, err := session.Create(se, &w)
if err != nil {
return "", err
}
@ -483,11 +496,14 @@ func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) {
func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) {
defer close(stopCh)
kv := s.c.KV()
opts := &QueryOptions{RequireConsistent: true}
opts := QueryOptions{
RequireConsistent: true,
Namespace: s.opts.Namespace,
}
WAIT:
retries := s.opts.MonitorRetries
RETRY:
pairs, meta, err := kv.List(s.opts.Prefix, opts)
pairs, meta, err := kv.List(s.opts.Prefix, &opts)
if err != nil {
// If configured we can try to ride out a brief Consul unavailability
// by doing retries. Note that we have to attempt the retry in a non-

View File

@ -93,6 +93,19 @@ type KVTxnResponse struct {
Errors TxnErrors
}
// SessionOp constants give possible operations available in a transaction.
type SessionOp string
const (
SessionDelete SessionOp = "delete"
)
// SessionTxnOp defines a single operation inside a transaction.
type SessionTxnOp struct {
Verb SessionOp
Session Session
}
// NodeOp constants give possible operations available in a transaction.
type NodeOp string

View File

@ -151,6 +151,7 @@ func TestAPI_ClientTxn(t *testing.T) {
LockIndex: 1,
CreateIndex: ret.Results[0].KV.CreateIndex,
ModifyIndex: ret.Results[0].KV.ModifyIndex,
Namespace: ret.Results[0].KV.Namespace,
},
},
&TxnResult{
@ -161,6 +162,7 @@ func TestAPI_ClientTxn(t *testing.T) {
LockIndex: 1,
CreateIndex: ret.Results[1].KV.CreateIndex,
ModifyIndex: ret.Results[1].KV.ModifyIndex,
Namespace: ret.Results[0].KV.Namespace,
},
},
&TxnResult{
@ -253,6 +255,7 @@ func TestAPI_ClientTxn(t *testing.T) {
LockIndex: 1,
CreateIndex: ret.Results[0].KV.CreateIndex,
ModifyIndex: ret.Results[0].KV.ModifyIndex,
Namespace: ret.Results[0].KV.Namespace,
},
},
&TxnResult{

View File

@ -18,6 +18,7 @@ type HTTPFlags struct {
certFile StringValue
keyFile StringValue
tlsServerName StringValue
namespace StringValue
// server flags
datacenter StringValue
@ -55,6 +56,10 @@ func (f *HTTPFlags) ClientFlags() *flag.FlagSet {
fs.Var(&f.tlsServerName, "tls-server-name",
"The server name to use as the SNI host when connecting via TLS. This "+
"can also be specified via the CONSUL_TLS_SERVER_NAME environment variable.")
// TODO (namespaces) Do we want to allow setting via an env var? CONSUL_NAMESPACE
fs.Var(&f.namespace, "ns",
"Specifies the namespace to query. If not provided, the namespace will be inferred +"+
"from the request's ACL token, or will default to the `default` namespace.")
return fs
}
@ -135,4 +140,5 @@ func (f *HTTPFlags) MergeOntoConfig(c *api.Config) {
f.keyFile.Merge(&c.TLSConfig.KeyFile)
f.tlsServerName.Merge(&c.TLSConfig.Address)
f.datacenter.Merge(&c.Datacenter)
f.namespace.Merge(&c.Namespace)
}

View File

@ -198,6 +198,9 @@ func prettyKVPair(w io.Writer, pair *api.KVPair, base64EncodeValue bool) error {
} else {
fmt.Fprintf(tw, "Session\t%s\n", pair.Session)
}
if pair.Namespace != "" {
fmt.Fprintf(tw, "Namespace\t%s\n", pair.Namespace)
}
if base64EncodeValue {
fmt.Fprintf(tw, "Value\t%s", base64.StdEncoding.EncodeToString(pair.Value))
} else {

View File

@ -80,7 +80,8 @@ func (c *cmd) Run(args []string) int {
Value: value,
}
if _, err := client.KV().Put(pair, nil); err != nil {
w := api.WriteOptions{Namespace: entry.Namespace}
if _, err := client.KV().Put(pair, &w); err != nil {
c.UI.Error(fmt.Sprintf("Error! Failed writing data for key %s: %s", pair.Key, err))
return 1
}

View File

@ -7,15 +7,17 @@ import (
)
type Entry struct {
Key string `json:"key"`
Flags uint64 `json:"flags"`
Value string `json:"value"`
Key string `json:"key"`
Flags uint64 `json:"flags"`
Value string `json:"value"`
Namespace string `json:"namespace,omitempty"`
}
func ToEntry(pair *api.KVPair) *Entry {
return &Entry{
Key: pair.Key,
Flags: pair.Flags,
Value: base64.StdEncoding.EncodeToString(pair.Value),
Key: pair.Key,
Flags: pair.Flags,
Value: base64.StdEncoding.EncodeToString(pair.Value),
Namespace: pair.Namespace,
}
}

View File

@ -62,11 +62,16 @@ The table below shows this endpoint's support for
metadata). Specifying this implies `recurse`. This is specified as part of the
URL as a query parameter.
- `separator` `(string: '')` - Specifies the string to use as a separator
- `separator` `(string: "")` - Specifies the string to use as a separator
for recursive key lookups. This option is only used when paired with the `keys`
parameter to limit the prefix of keys returned, only up to the given separator.
This is specified as part of the URL as a query parameter.
- `ns` `(string: "")` - **Enterprise Only** Specifies the namespace to query.
If not provided, the namespace will be inferred from the request's ACL token,
or will default to the `default` namespace. This is specified as part of the
URL as a query parameter.
### Sample Request
```text
@ -201,6 +206,11 @@ The table below shows this endpoint's support for
will leave the `LockIndex` unmodified but will clear the associated `Session`
of the key. The key must be held by this session to be unlocked.
- `ns` `(string: "")` - **Enterprise Only** Specifies the namespace to query.
If not provided, the namespace will be inferred from the request's ACL token,
or will default to the `default` namespace. This is specified as part of the
URL as a query parameter.
### Sample Payload
The payload is arbitrary, and is loaded directly into Consul as supplied.
@ -257,6 +267,11 @@ The table below shows this endpoint's support for
index will not delete the key. If the index is non-zero, the key is only
deleted if the index matches the `ModifyIndex` of that key.
- `ns` `(string: "")` - **Enterprise Only** Specifies the namespace to query.
If not provided, the namespace will be inferred from the request's ACL token,
or will default to the `default` namespace. This is specified as part of the
URL as a query parameter.
### Sample Request
```text

View File

@ -34,3 +34,7 @@
instead of one specified via the `-token` argument or `CONSUL_HTTP_TOKEN`
environment variable. This can also be specified via the
`CONSUL_HTTP_TOKEN_FILE` environment variable.
* `-ns=<value>` - Specifies the namespace to query.
If not provided, the namespace will be inferred from the request's ACL token,
or will default to the `default` namespace.