diff --git a/consul/client.go b/consul/client.go index a00bc9a6e2..6cfeb98712 100644 --- a/consul/client.go +++ b/consul/client.go @@ -9,6 +9,7 @@ import ( "net" "os" "path/filepath" + "strings" "sync" "time" ) @@ -178,6 +179,7 @@ func (c *Client) lanEventHandler() { case serf.EventMemberFailed: c.nodeFail(e.(serf.MemberEvent)) case serf.EventUser: + c.localEvent(e.(serf.UserEvent)) default: c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) } @@ -250,6 +252,26 @@ func (c *Client) nodeFail(me serf.MemberEvent) { } } +// localEvent is called when we receive an event on the local Serf +func (c *Client) localEvent(event serf.UserEvent) { + // Handle only consul events + if !strings.HasPrefix(event.Name, "consul:") { + return + } + + switch event.Name { + case newLeaderEvent: + c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload) + + // Trigger the callback + if c.config.ServerUp != nil { + c.config.ServerUp() + } + default: + c.logger.Printf("[WARN] consul: Unhandled local event: %v", event) + } +} + // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Check the last rpc time diff --git a/consul/leader.go b/consul/leader.go index 6a28bfdff4..a527056373 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -13,6 +13,7 @@ const ( SerfCheckName = "Serf Health Status" ConsulServiceID = "consul" ConsulServiceName = "consul" + newLeaderEvent = "consul:new-leader" ) // monitorLeadership is used to monitor if we acquire or lose our role @@ -42,6 +43,12 @@ func (s *Server) monitorLeadership() { // leaderLoop runs as long as we are the leader to run various // maintence activities func (s *Server) leaderLoop(stopCh chan struct{}) { + // Fire a user event indicating a new leader + payload := []byte(s.config.NodeName) + if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { + s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err) + } + // Reconcile channel is only used once initial reconcile // has succeeded var reconcileCh chan serf.Member diff --git a/consul/serf.go b/consul/serf.go index 61e92b4b37..4e03661dbe 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -3,6 +3,7 @@ package consul import ( "github.com/hashicorp/serf/serf" "net" + "strings" ) // lanEventHandler is used to handle events from the lan Serf cluster @@ -18,6 +19,7 @@ func (s *Server) lanEventHandler() { case serf.EventMemberFailed: s.localMemberEvent(e.(serf.MemberEvent)) case serf.EventUser: + s.localEvent(e.(serf.UserEvent)) default: s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) } @@ -68,6 +70,26 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) { } } +// localEvent is called when we receive an event on the local Serf +func (s *Server) localEvent(event serf.UserEvent) { + // Handle only consul events + if !strings.HasPrefix(event.Name, "consul:") { + return + } + + switch event.Name { + case newLeaderEvent: + s.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload) + + // Trigger the callback + if s.config.ServerUp != nil { + s.config.ServerUp() + } + default: + s.logger.Printf("[WARN] consul: Unhandled local event: %v", event) + } +} + // remoteJoin is used to handle join events on the wan serf cluster func (s *Server) remoteJoin(me serf.MemberEvent) { for _, m := range me.Members {