mirror of https://github.com/status-im/consul.git
consul: Pulling in ACLs
This commit is contained in:
parent
1abfd6c050
commit
97a737b1ee
|
@ -0,0 +1,79 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
)
|
||||||
|
|
||||||
|
// aclCacheEntry is used to cache non-authoritative ACL's
|
||||||
|
// If non-authoritative, then we must respect a TTL
|
||||||
|
type aclCacheEntry struct {
|
||||||
|
ACL acl.ACL
|
||||||
|
TTL time.Duration
|
||||||
|
Expires time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// aclFault is used to fault in the rules for an ACL if we take a miss
|
||||||
|
func (s *Server) aclFault(id string) (string, error) {
|
||||||
|
state := s.fsm.State()
|
||||||
|
_, acl, err := state.ACLGet(id)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if acl == nil {
|
||||||
|
return "", fmt.Errorf("ACL not found: %s", id)
|
||||||
|
}
|
||||||
|
return acl.Rules, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveToken is used to resolve an ACL is any is appropriate
|
||||||
|
func (s *Server) resolveToken(id string) (acl.ACL, error) {
|
||||||
|
// Check if there is no ACL datacenter (ACL's disabled)
|
||||||
|
authDC := s.config.ACLDatacenter
|
||||||
|
if authDC == "" {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we are the ACL datacenter and the leader, use the
|
||||||
|
// authoritative cache
|
||||||
|
if s.config.Datacenter == authDC && s.IsLeader() {
|
||||||
|
return s.aclAuthCache.GetACL(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use our non-authoritative cache
|
||||||
|
return s.lookupACL(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// lookupACL is used when we are non-authoritative, and need
|
||||||
|
// to resolve an ACL
|
||||||
|
func (s *Server) lookupACL(id string) (acl.ACL, error) {
|
||||||
|
// Check the cache for the ACL
|
||||||
|
var cached *aclCacheEntry
|
||||||
|
raw, ok := s.aclCache.Get(id)
|
||||||
|
if ok {
|
||||||
|
cached = raw.(*aclCacheEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for live cache
|
||||||
|
if cached != nil && time.Now().Before(cached.Expires) {
|
||||||
|
return cached.ACL, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to refresh the policy
|
||||||
|
// TODO: GetPolicy...
|
||||||
|
|
||||||
|
// Unable to refresh, apply the down policy
|
||||||
|
switch s.config.ACLDownPolicy {
|
||||||
|
case "allow":
|
||||||
|
return acl.AllowAll(), nil
|
||||||
|
case "extend-cache":
|
||||||
|
if cached != nil {
|
||||||
|
return cached.ACL, nil
|
||||||
|
}
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return acl.DenyAll(), nil
|
||||||
|
}
|
||||||
|
}
|
|
@ -81,6 +81,26 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPolicy is used to retrieve a compiled policy object with a TTL. Does not
|
||||||
|
// support a blocking query.
|
||||||
|
func (a *ACL) GetPolicy(args *structs.ACLSpecificRequest, reply *structs.ACLPolicy) error {
|
||||||
|
if done, err := a.srv.forward("ACL.GetPolicy", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the policy via the cache
|
||||||
|
policy, err := a.srv.aclAuthCache.GetACLPolicy(args.ACL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the response
|
||||||
|
reply.Policy = policy
|
||||||
|
reply.TTL = a.srv.config.ACLTTL
|
||||||
|
a.srv.setQueryMeta(&reply.QueryMeta)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// List is used to list all the ACLs
|
// List is used to list all the ACLs
|
||||||
func (a *ACL) List(args *structs.DCSpecificRequest,
|
func (a *ACL) List(args *structs.DCSpecificRequest,
|
||||||
reply *structs.IndexedACLs) error {
|
reply *structs.IndexedACLs) error {
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
|
||||||
"github.com/hashicorp/consul/testutil"
|
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"github.com/hashicorp/consul/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestACLEndpoint_Apply(t *testing.T) {
|
func TestACLEndpoint_Apply(t *testing.T) {
|
||||||
|
@ -106,6 +108,45 @@ func TestACLEndpoint_Get(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestACLEndpoint_GetPolicy(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||||
|
|
||||||
|
arg := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out string
|
||||||
|
if err := client.Call("ACL.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
getR := structs.ACLSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ACL: out,
|
||||||
|
}
|
||||||
|
var acls structs.ACLPolicy
|
||||||
|
if err := client.Call("ACL.GetPolicy", &getR, &acls); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if acls.Policy == nil {
|
||||||
|
t.Fatalf("Bad: %v", acls)
|
||||||
|
}
|
||||||
|
if acls.TTL != 30*time.Second {
|
||||||
|
t.Fatalf("bad: %v", acls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestACLEndpoint_List(t *testing.T) {
|
func TestACLEndpoint_List(t *testing.T) {
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
|
|
|
@ -15,6 +15,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/golang-lru"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"github.com/hashicorp/raft-mdb"
|
"github.com/hashicorp/raft-mdb"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
|
@ -43,11 +45,21 @@ const (
|
||||||
// serverMaxStreams controsl how many idle streams we keep
|
// serverMaxStreams controsl how many idle streams we keep
|
||||||
// open to a server
|
// open to a server
|
||||||
serverMaxStreams = 64
|
serverMaxStreams = 64
|
||||||
|
|
||||||
|
// Maximum number of cached ACL entries
|
||||||
|
aclCacheSize = 256
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is Consul server which manages the service discovery,
|
// Server is Consul server which manages the service discovery,
|
||||||
// health checking, DC forwarding, Raft, and multiple Serf pools.
|
// health checking, DC forwarding, Raft, and multiple Serf pools.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
|
// aclAuthCache is the authoritative ACL cache
|
||||||
|
aclAuthCache *acl.Cache
|
||||||
|
|
||||||
|
// aclCache is a non-authoritative ACL cache
|
||||||
|
aclCache *lru.Cache
|
||||||
|
|
||||||
|
// Consul configuration
|
||||||
config *Config
|
config *Config
|
||||||
|
|
||||||
// Connection pool to other consul servers
|
// Connection pool to other consul servers
|
||||||
|
@ -181,6 +193,29 @@ func NewServer(config *Config) (*Server, error) {
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Determine the ACL root policy
|
||||||
|
var aclRoot acl.ACL
|
||||||
|
switch config.ACLDefaultPolicy {
|
||||||
|
case "allow":
|
||||||
|
aclRoot = acl.AllowAll()
|
||||||
|
case "deny":
|
||||||
|
aclRoot = acl.DenyAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the authoritative ACL cache
|
||||||
|
s.aclAuthCache, err = acl.NewCache(aclCacheSize, aclRoot, s.aclFault)
|
||||||
|
if err != nil {
|
||||||
|
s.Shutdown()
|
||||||
|
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the non-authoritative ACL cache
|
||||||
|
s.aclCache, err = lru.New(aclCacheSize)
|
||||||
|
if err != nil {
|
||||||
|
s.Shutdown()
|
||||||
|
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize the RPC layer
|
// Initialize the RPC layer
|
||||||
if err := s.setupRPC(tlsConfig); err != nil {
|
if err := s.setupRPC(tlsConfig); err != nil {
|
||||||
s.Shutdown()
|
s.Shutdown()
|
||||||
|
|
|
@ -3,8 +3,10 @@ package structs
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/ugorji/go/codec"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/ugorji/go/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -469,6 +471,12 @@ type IndexedACLs struct {
|
||||||
QueryMeta
|
QueryMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ACLPolicy struct {
|
||||||
|
Policy *acl.Policy
|
||||||
|
TTL time.Duration
|
||||||
|
QueryMeta
|
||||||
|
}
|
||||||
|
|
||||||
// msgpackHandle is a shared handle for encoding/decoding of structs
|
// msgpackHandle is a shared handle for encoding/decoding of structs
|
||||||
var msgpackHandle = &codec.MsgpackHandle{}
|
var msgpackHandle = &codec.MsgpackHandle{}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue