diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index bb8c956bf3..9250a6ac01 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -30,6 +30,14 @@ func (k *KVS) preApply(acl acl.ACL, op structs.KVSOp, dirEnt *structs.DirEntry) if !acl.KeyWritePrefix(dirEnt.Key) { return false, permissionDeniedErr } + + case structs.KVSAtomicGet, + structs.KVSAtomicCheckSession, + structs.KVSAtomicCheckIndex: + if !acl.KeyRead(dirEnt.Key) { + return false, permissionDeniedErr + } + default: if !acl.KeyWrite(dirEnt.Key) { return false, permissionDeniedErr diff --git a/consul/kvs_endpoint_test.go b/consul/kvs_endpoint_test.go index 38d2b5f084..08e3460519 100644 --- a/consul/kvs_endpoint_test.go +++ b/consul/kvs_endpoint_test.go @@ -1,7 +1,9 @@ package consul import ( + "bytes" "os" + "reflect" "strings" "testing" "time" @@ -129,6 +131,198 @@ func TestKVS_Apply_ACLDeny(t *testing.T) { } } +func TestKVS_AtomicApply(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Do a super basic request. The state store test covers the details so + // we just need to be sure that the transaction is sent correctly and + // the results are converted appropriately. + arg := structs.KVSAtomicRequest{ + Datacenter: "dc1", + Ops: structs.KVSAtomicOps{ + &structs.KVSAtomicOp{ + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicGet, + DirEnt: structs.DirEntry{ + Key: "test", + }, + }, + }, + } + var out structs.KVSAtomicResponse + if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify the state store directly. + state := s1.fsm.State() + _, d, err := state.KVSGet("test") + if err != nil { + t.Fatalf("err: %v", err) + } + if d == nil { + t.Fatalf("should not be nil") + } + if d.Flags != 42 || + !bytes.Equal(d.Value, []byte("test")) { + t.Fatalf("bad: %v", d) + } + + // Verify the transaction's return value. + expected := structs.KVSAtomicResponse{ + Results: structs.DirEntries{ + &structs.DirEntry{ + Key: "test", + Flags: 42, + Value: nil, + RaftIndex: structs.RaftIndex{ + CreateIndex: d.CreateIndex, + ModifyIndex: d.ModifyIndex, + }, + }, + &structs.DirEntry{ + Key: "test", + Flags: 42, + Value: []byte("test"), + RaftIndex: structs.RaftIndex{ + CreateIndex: d.CreateIndex, + ModifyIndex: d.ModifyIndex, + }, + }, + }, + } + if !reflect.DeepEqual(out, expected) { + t.Fatalf("bad %v", out) + } +} + +func TestKVS_AtomicApply_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Create the ACL. + var id string + { + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testListRules, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Set up a transaction where every operation should get blocked due to + // ACLs. + arg := structs.KVSAtomicRequest{ + Datacenter: "dc1", + Ops: structs.KVSAtomicOps{ + &structs.KVSAtomicOp{ + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSDelete, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSDeleteCAS, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSDeleteTree, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSCAS, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSUnlock, + DirEnt: structs.DirEntry{ + Key: "foo", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicGet, + DirEnt: structs.DirEntry{ + Key: "nope", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckSession, + DirEnt: structs.DirEntry{ + Key: "nope", + }, + }, + &structs.KVSAtomicOp{ + Op: structs.KVSAtomicCheckIndex, + DirEnt: structs.DirEntry{ + Key: "nope", + }, + }, + }, + WriteRequest: structs.WriteRequest{Token: id}, + } + var out structs.KVSAtomicResponse + if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify the transaction's return value. + var expected structs.KVSAtomicResponse + for i, _ := range arg.Ops { + expected.Errors = append(expected.Errors, &structs.KVSAtomicError{i, permissionDeniedErr.Error()}) + } + if !reflect.DeepEqual(out, expected) { + t.Fatalf("bad %v", out) + } +} + func TestKVS_Get(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -627,7 +821,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) { testutil.WaitForLeader(t, s1.RPC, "dc1") - // Create and invalidate a session with a lock + // Create and invalidate a session with a lock. state := s1.fsm.State() if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) @@ -652,13 +846,13 @@ func TestKVS_Apply_LockDelay(t *testing.T) { t.Fatalf("err: %v", err) } - // Make a new session that is valid + // Make a new session that is valid. if err := state.SessionCreate(5, session); err != nil { t.Fatalf("err: %v", err) } validId := session.ID - // Make a lock request + // Make a lock request. arg := structs.KVSRequest{ Datacenter: "dc1", Op: structs.KVSLock, @@ -675,10 +869,10 @@ func TestKVS_Apply_LockDelay(t *testing.T) { t.Fatalf("should not acquire") } - // Wait for lock-delay + // Wait for lock-delay. time.Sleep(50 * time.Millisecond) - // Should acquire + // Should acquire. if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } @@ -687,6 +881,89 @@ func TestKVS_Apply_LockDelay(t *testing.T) { } } +func TestKVS_AtomicApply_LockDelay(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Create and invalidate a session with a lock. + state := s1.fsm.State() + if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { + t.Fatalf("err: %v", err) + } + session := &structs.Session{ + ID: generateUUID(), + Node: "foo", + LockDelay: 50 * time.Millisecond, + } + if err := state.SessionCreate(2, session); err != nil { + t.Fatalf("err: %v", err) + } + id := session.ID + d := &structs.DirEntry{ + Key: "test", + Session: id, + } + if ok, err := state.KVSLock(3, d); err != nil || !ok { + t.Fatalf("err: %v", err) + } + if err := state.SessionDestroy(4, id); err != nil { + t.Fatalf("err: %v", err) + } + + // Make a new session that is valid. + if err := state.SessionCreate(5, session); err != nil { + t.Fatalf("err: %v", err) + } + validId := session.ID + + // Make a lock request via an atomic transaction. + arg := structs.KVSAtomicRequest{ + Datacenter: "dc1", + Ops: structs.KVSAtomicOps{ + &structs.KVSAtomicOp{ + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: "test", + Session: validId, + }, + }, + }, + } + { + var out structs.KVSAtomicResponse + if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.Results) != 0 || + len(out.Errors) != 1 || + out.Errors[0].OpIndex != 0 || + !strings.Contains(out.Errors[0].What, "due to lock delay") { + t.Fatalf("bad: %v", out) + } + } + + // Wait for lock-delay. + time.Sleep(50 * time.Millisecond) + + // Should acquire. + { + var out structs.KVSAtomicResponse + if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.Results) != 1 || + len(out.Errors) != 0 || + out.Results[0].LockIndex != 2 { + t.Fatalf("bad: %v", out) + } + } +} + func TestKVS_Issue_1626(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1)