Adds internal endpoint read ACL support and full unit tests.

This commit is contained in:
James Phillips 2016-05-10 11:23:47 -07:00
parent 2f51926852
commit fcb0c20867
2 changed files with 290 additions and 5 deletions

View File

@ -30,6 +30,14 @@ func (k *KVS) preApply(acl acl.ACL, op structs.KVSOp, dirEnt *structs.DirEntry)
if !acl.KeyWritePrefix(dirEnt.Key) {
return false, permissionDeniedErr
}
case structs.KVSAtomicGet,
structs.KVSAtomicCheckSession,
structs.KVSAtomicCheckIndex:
if !acl.KeyRead(dirEnt.Key) {
return false, permissionDeniedErr
}
default:
if !acl.KeyWrite(dirEnt.Key) {
return false, permissionDeniedErr

View File

@ -1,7 +1,9 @@
package consul
import (
"bytes"
"os"
"reflect"
"strings"
"testing"
"time"
@ -129,6 +131,198 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
}
}
func TestKVS_AtomicApply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
}
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the state store directly.
state := s1.fsm.State()
_, d, err := state.KVSGet("test")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should not be nil")
}
if d.Flags != 42 ||
!bytes.Equal(d.Value, []byte("test")) {
t.Fatalf("bad: %v", d)
}
// Verify the transaction's return value.
expected := structs.KVSAtomicResponse{
Results: structs.DirEntries{
&structs.DirEntry{
Key: "test",
Flags: 42,
Value: nil,
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
&structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestKVS_AtomicApply_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
WriteRequest: structs.WriteRequest{Token: id},
}
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
var expected structs.KVSAtomicResponse
for i, _ := range arg.Ops {
expected.Errors = append(expected.Errors, &structs.KVSAtomicError{i, permissionDeniedErr.Error()})
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestKVS_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -627,7 +821,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock
// Create and invalidate a session with a lock.
state := s1.fsm.State()
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
@ -652,13 +846,13 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid
// Make a new session that is valid.
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request
// Make a lock request.
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSLock,
@ -675,10 +869,10 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
t.Fatalf("should not acquire")
}
// Wait for lock-delay
// Wait for lock-delay.
time.Sleep(50 * time.Millisecond)
// Should acquire
// Should acquire.
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -687,6 +881,89 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
}
}
func TestKVS_AtomicApply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock.
state := s1.fsm.State()
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := state.SessionCreate(2, session); err != nil {
t.Fatalf("err: %v", err)
}
id := session.ID
d := &structs.DirEntry{
Key: "test",
Session: id,
}
if ok, err := state.KVSLock(3, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
if err := state.SessionDestroy(4, id); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid.
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request via an atomic transaction.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "test",
Session: validId,
},
},
},
}
{
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 0 ||
len(out.Errors) != 1 ||
out.Errors[0].OpIndex != 0 ||
!strings.Contains(out.Errors[0].What, "due to lock delay") {
t.Fatalf("bad: %v", out)
}
}
// Wait for lock-delay.
time.Sleep(50 * time.Millisecond)
// Should acquire.
{
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 1 ||
len(out.Errors) != 0 ||
out.Results[0].LockIndex != 2 {
t.Fatalf("bad: %v", out)
}
}
}
func TestKVS_Issue_1626(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)