mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 03:29:43 +00:00
Adds an ACL replication status endpoint.
This commit is contained in:
parent
734cc0b3d5
commit
d29af2ddc7
@ -205,3 +205,20 @@ func (s *HTTPServer) ACLList(resp http.ResponseWriter, req *http.Request) (inter
|
||||
}
|
||||
return out.ACLs, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) ACLReplicationStatus(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Note that we do not forward to the ACL DC here. This is a query for
|
||||
// any DC that's doing replication.
|
||||
args := structs.DCSpecificRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Make the request.
|
||||
var out structs.ACLReplicationStatus
|
||||
if err := s.agent.RPC("ACL.ReplicationStatus", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
@ -218,3 +218,18 @@ func TestACLList(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLReplicationStatus(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
req, err := http.NewRequest("GET", "/v1/acl/replication", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.ACLReplicationStatus(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, ok := obj.(structs.ACLReplicationStatus)
|
||||
if !ok {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -257,6 +257,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
||||
s.mux.HandleFunc("/v1/acl/info/", s.wrap(s.ACLGet))
|
||||
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(s.ACLClone))
|
||||
s.mux.HandleFunc("/v1/acl/list", s.wrap(s.ACLList))
|
||||
s.mux.HandleFunc("/v1/acl/replication", s.wrap(s.ACLReplicationStatus))
|
||||
} else {
|
||||
s.mux.HandleFunc("/v1/acl/create", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/update", s.wrap(aclDisabled))
|
||||
@ -264,6 +265,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
||||
s.mux.HandleFunc("/v1/acl/info/", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/replication", s.wrap(aclDisabled))
|
||||
}
|
||||
|
||||
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
|
||||
|
@ -235,3 +235,25 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ReplicationStatus is used to retrieve the current ACL replication status.
|
||||
func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest,
|
||||
reply *structs.ACLReplicationStatus) error {
|
||||
// This must be sent to the leader, so we fix the args since we are
|
||||
// re-using a structure where we don't support all the options.
|
||||
args.RequireConsistent = true
|
||||
args.AllowStale = false
|
||||
if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// There's no ACL token required here since this doesn't leak any
|
||||
// sensitive information, and we don't want people to have to use
|
||||
// management tokens if they are querying this via a health check.
|
||||
|
||||
// Poll the latest status.
|
||||
a.srv.aclReplicationStatusLock.RLock()
|
||||
*reply = a.srv.aclReplicationStatus
|
||||
a.srv.aclReplicationStatusLock.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
@ -466,3 +466,29 @@ func TestACLEndpoint_List_Denied(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLEndpoint_ReplicationStatus(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc2"
|
||||
c.ACLReplicationToken = "secret"
|
||||
c.ACLReplicationInterval = 0
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
getR := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var status structs.ACLReplicationStatus
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.ReplicationStatus", &getR, &status)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !status.Enabled || !status.Running || status.SourceDatacenter != "dc2" {
|
||||
t.Fatalf("bad: %#v", status)
|
||||
}
|
||||
}
|
||||
|
@ -256,9 +256,33 @@ func (s *Server) IsACLReplicationEnabled() bool {
|
||||
len(s.config.ACLReplicationToken) > 0
|
||||
}
|
||||
|
||||
// updateACLReplicationStatus safely updates the ACL replication status.
|
||||
func (s *Server) updateACLReplicationStatus(status structs.ACLReplicationStatus) {
|
||||
// Fixup the times to shed some useless precision to ease formattting,
|
||||
// and always report UTC.
|
||||
status.LastError = status.LastError.Round(time.Second).UTC()
|
||||
status.LastSuccess = status.LastSuccess.Round(time.Second).UTC()
|
||||
|
||||
// Set the shared state.
|
||||
s.aclReplicationStatusLock.Lock()
|
||||
s.aclReplicationStatus = status
|
||||
s.aclReplicationStatusLock.Unlock()
|
||||
}
|
||||
|
||||
// runACLReplication is a long-running goroutine that will attempt to replicate
|
||||
// ACLs while the server is the leader, until the shutdown channel closes.
|
||||
func (s *Server) runACLReplication() {
|
||||
var status structs.ACLReplicationStatus
|
||||
status.Enabled = true
|
||||
status.SourceDatacenter = s.config.ACLDatacenter
|
||||
s.updateACLReplicationStatus(status)
|
||||
|
||||
// Show that it's not running on the way out.
|
||||
defer func() {
|
||||
status.Running = false
|
||||
s.updateACLReplicationStatus(status)
|
||||
}()
|
||||
|
||||
// Give each server's replicator a random initial phase for good
|
||||
// measure.
|
||||
select {
|
||||
@ -266,26 +290,39 @@ func (s *Server) runACLReplication() {
|
||||
case <-s.shutdownCh:
|
||||
}
|
||||
|
||||
// We are fairly conservative with the lastRemoteIndex so that after a
|
||||
// leadership change or an error we re-sync everything (we also don't
|
||||
// want to block the first time after one of these events so we can
|
||||
// show a successful sync in the status endpoint).
|
||||
var lastRemoteIndex uint64
|
||||
var wasActive bool
|
||||
replicate := func() {
|
||||
if !wasActive {
|
||||
if !status.Running {
|
||||
lastRemoteIndex = 0 // Re-sync everything.
|
||||
status.Running = true
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[INFO] consul: ACL replication started")
|
||||
wasActive = true
|
||||
}
|
||||
|
||||
var err error
|
||||
lastRemoteIndex, err = s.replicateACLs(lastRemoteIndex)
|
||||
index, err := s.replicateACLs(lastRemoteIndex)
|
||||
if err != nil {
|
||||
lastRemoteIndex = 0 // Re-sync everything.
|
||||
status.LastError = time.Now()
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err)
|
||||
} else {
|
||||
s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", lastRemoteIndex)
|
||||
lastRemoteIndex = index
|
||||
status.ReplicatedIndex = index
|
||||
status.LastSuccess = time.Now()
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", index)
|
||||
}
|
||||
}
|
||||
pause := func() {
|
||||
if wasActive {
|
||||
if status.Running {
|
||||
lastRemoteIndex = 0 // Re-sync everything.
|
||||
status.Running = false
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)")
|
||||
wasActive = false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -364,7 +364,7 @@ func TestACLReplication(t *testing.T) {
|
||||
}
|
||||
|
||||
checkSame := func() (bool, error) {
|
||||
_, remote, err := s1.fsm.State().ACLList()
|
||||
index, remote, err := s1.fsm.State().ACLList()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -380,6 +380,17 @@ func TestACLReplication(t *testing.T) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
var status structs.ACLReplicationStatus
|
||||
s2.aclReplicationStatusLock.RLock()
|
||||
status = s2.aclReplicationStatus
|
||||
s2.aclReplicationStatusLock.RUnlock()
|
||||
if !status.Enabled || !status.Running ||
|
||||
status.ReplicatedIndex != index ||
|
||||
status.SourceDatacenter != "dc1" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/raft-boltdb"
|
||||
@ -149,6 +150,11 @@ type Server struct {
|
||||
// for the KV tombstones
|
||||
tombstoneGC *state.TombstoneGC
|
||||
|
||||
// aclReplicationStatus (and its associated lock) provide information
|
||||
// about the health of the ACL replication goroutine.
|
||||
aclReplicationStatus structs.ACLReplicationStatus
|
||||
aclReplicationStatusLock sync.RWMutex
|
||||
|
||||
// shutdown and the associated members here are used in orchestrating
|
||||
// a clean shutdown. The shutdownCh is never written to, only closed to
|
||||
// indicate a shutdown has been initiated.
|
||||
|
@ -748,6 +748,17 @@ type ACLPolicy struct {
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// ACLReplicationStatus provides information about the health of the ACL
|
||||
// replication system.
|
||||
type ACLReplicationStatus struct {
|
||||
Enabled bool
|
||||
Running bool
|
||||
SourceDatacenter string
|
||||
ReplicatedIndex uint64
|
||||
LastSuccess time.Time
|
||||
LastError time.Time
|
||||
}
|
||||
|
||||
// Coordinate stores a node name with its associated network coordinate.
|
||||
type Coordinate struct {
|
||||
Node string
|
||||
|
Loading…
x
Reference in New Issue
Block a user