consul: Adding telemetry

This commit is contained in:
Armon Dadgar 2014-02-20 15:16:26 -08:00
parent dca8c79576
commit 889297dc99
4 changed files with 13 additions and 1 deletions

View File

@ -2,7 +2,9 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"time"
) )
// Catalog endpoint is used to manipulate the service catalog // Catalog endpoint is used to manipulate the service catalog
@ -15,6 +17,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.Register", args.Datacenter, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "catalog", "register"}, time.Now())
// Verify the args // Verify the args
if args.Node == "" || args.Address == "" { if args.Node == "" || args.Address == "" {
@ -55,6 +58,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done { if done, err := c.srv.forward("Catalog.Deregister", args.Datacenter, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "catalog", "deregister"}, time.Now())
// Verify the args // Verify the args
if args.Node == "" { if args.Node == "" {

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -59,11 +60,13 @@ RECONCILE:
interval := time.After(s.config.ReconcileInterval) interval := time.After(s.config.ReconcileInterval)
// Apply a raft barrier to ensure our FSM is caught up // Apply a raft barrier to ensure our FSM is caught up
start := time.Now()
barrier := s.raft.Barrier(0) barrier := s.raft.Barrier(0)
if err := barrier.Error(); err != nil { if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err) s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err)
goto WAIT goto WAIT
} }
metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start)
// Reconcile any missing data // Reconcile any missing data
if err := s.reconcile(); err != nil { if err := s.reconcile(); err != nil {
@ -97,6 +100,7 @@ WAIT:
// Mainly we need to ensure all live nodes are registered, all failed // Mainly we need to ensure all live nodes are registered, all failed
// nodes are marked as such, and all left nodes are de-registered. // nodes are marked as such, and all left nodes are de-registered.
func (s *Server) reconcile() (err error) { func (s *Server) reconcile() (err error) {
defer metrics.MeasureSince([]string{"consul", "leader", "reconcile"}, time.Now())
members := s.serfLAN.Members() members := s.serfLAN.Members()
for _, member := range members { for _, member := range members {
if err := s.reconcileMember(member); err != nil { if err := s.reconcileMember(member); err != nil {
@ -114,6 +118,7 @@ func (s *Server) reconcileMember(member serf.Member) error {
s.logger.Printf("[WARN] consul: skipping reconcile of node %v", member) s.logger.Printf("[WARN] consul: skipping reconcile of node %v", member)
return nil return nil
} }
defer metrics.MeasureSince([]string{"consul", "leader", "reconcileMember"}, time.Now())
var err error var err error
switch member.Status { switch member.Status {
case serf.StatusAlive: case serf.StatusAlive:

View File

@ -2,6 +2,7 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/inconshreveable/muxado" "github.com/inconshreveable/muxado"
"github.com/ugorji/go/codec" "github.com/ugorji/go/codec"
@ -43,6 +44,7 @@ func (s *Server) listen() {
s.rpcClientLock.Unlock() s.rpcClientLock.Unlock()
go s.handleConn(conn) go s.handleConn(conn)
metrics.IncrCounter([]string{"consul", "rpc", "accept_conn"}, 1)
} }
} }
@ -63,6 +65,7 @@ func (s *Server) handleConn(conn net.Conn) {
s.handleConsulConn(conn) s.handleConsulConn(conn)
case rpcRaft: case rpcRaft:
metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1)
s.raftLayer.Handoff(conn) s.raftLayer.Handoff(conn)
case rpcMultiplex: case rpcMultiplex:
@ -154,6 +157,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
s.remoteLock.RUnlock() s.remoteLock.RUnlock()
// Forward to remote Consul // Forward to remote Consul
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
return s.connPool.RPC(server, method, args, reply) return s.connPool.RPC(server, method, args, reply)
} }

View File

@ -17,7 +17,6 @@ const (
serfLANSnapshot = "serf/local.snapshot" serfLANSnapshot = "serf/local.snapshot"
serfWANSnapshot = "serf/remote.snapshot" serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/" raftState = "raft/"
bootstrapFlag = "b"
) )
// Server is Consul server which manages the service discovery, // Server is Consul server which manages the service discovery,