mirror of
https://github.com/status-im/consul.git
synced 2025-01-20 18:50:04 +00:00
consul: remove explicit leave, use reconciliation
This commit is contained in:
parent
a31a19040e
commit
f16d13213c
@ -3,18 +3,10 @@
|
|||||||
Consul provides a few high-level services, each of which exposes
|
Consul provides a few high-level services, each of which exposes
|
||||||
methods. The services exposed are:
|
methods. The services exposed are:
|
||||||
|
|
||||||
* Raft : Used to manipulate Raft from non-leader nodes
|
|
||||||
* Status : Used to query status information
|
* Status : Used to query status information
|
||||||
* Catalog: Used to register, deregister, and query service information
|
* Catalog: Used to register, deregister, and query service information
|
||||||
* Health: Used to notify of health checks and changes to health
|
* Health: Used to notify of health checks and changes to health
|
||||||
|
|
||||||
## Raft Service
|
|
||||||
|
|
||||||
The Raft service is used to manipulate the Raft controls on the Leader
|
|
||||||
node. It is only for internal use. It exposes the following methods:
|
|
||||||
|
|
||||||
* RemovePeer: Used to remove a peer from the group
|
|
||||||
|
|
||||||
## Status Service
|
## Status Service
|
||||||
|
|
||||||
The status service is used to query for various status information
|
The status service is used to query for various status information
|
||||||
|
@ -1,25 +0,0 @@
|
|||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Raft endpoint is used to manipulate the Raft subsystem
|
|
||||||
type Raft struct {
|
|
||||||
server *Server
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Raft) RemovePeer(args string, reply *struct{}) error {
|
|
||||||
peer, err := net.ResolveTCPAddr("tcp", args)
|
|
||||||
if err != nil {
|
|
||||||
r.server.logger.Printf("[ERR] consul.raft: failed to parse peer: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
future := r.server.raft.RemovePeer(peer)
|
|
||||||
return future.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Raft) Snapshot(args struct{}, reply *struct{}) error {
|
|
||||||
future := r.server.raft.Snapshot()
|
|
||||||
return future.Error()
|
|
||||||
}
|
|
@ -113,7 +113,6 @@ type Server struct {
|
|||||||
type endpoints struct {
|
type endpoints struct {
|
||||||
Catalog *Catalog
|
Catalog *Catalog
|
||||||
Health *Health
|
Health *Health
|
||||||
Raft *Raft
|
|
||||||
Status *Status
|
Status *Status
|
||||||
KVS *KVS
|
KVS *KVS
|
||||||
Session *Session
|
Session *Session
|
||||||
@ -322,7 +321,6 @@ func (s *Server) setupRaft() error {
|
|||||||
func (s *Server) setupRPC(tlsConfig *tls.Config) error {
|
func (s *Server) setupRPC(tlsConfig *tls.Config) error {
|
||||||
// Create endpoints
|
// Create endpoints
|
||||||
s.endpoints.Status = &Status{s}
|
s.endpoints.Status = &Status{s}
|
||||||
s.endpoints.Raft = &Raft{s}
|
|
||||||
s.endpoints.Catalog = &Catalog{s}
|
s.endpoints.Catalog = &Catalog{s}
|
||||||
s.endpoints.Health = &Health{s}
|
s.endpoints.Health = &Health{s}
|
||||||
s.endpoints.KVS = &KVS{s}
|
s.endpoints.KVS = &KVS{s}
|
||||||
@ -331,7 +329,6 @@ func (s *Server) setupRPC(tlsConfig *tls.Config) error {
|
|||||||
|
|
||||||
// Register the handlers
|
// Register the handlers
|
||||||
s.rpcServer.Register(s.endpoints.Status)
|
s.rpcServer.Register(s.endpoints.Status)
|
||||||
s.rpcServer.Register(s.endpoints.Raft)
|
|
||||||
s.rpcServer.Register(s.endpoints.Catalog)
|
s.rpcServer.Register(s.endpoints.Catalog)
|
||||||
s.rpcServer.Register(s.endpoints.Health)
|
s.rpcServer.Register(s.endpoints.Health)
|
||||||
s.rpcServer.Register(s.endpoints.KVS)
|
s.rpcServer.Register(s.endpoints.KVS)
|
||||||
@ -437,45 +434,6 @@ func (s *Server) Leave() error {
|
|||||||
s.logger.Printf("[ERR] consul: failed to leave LAN Serf cluster: %v", err)
|
s.logger.Printf("[ERR] consul: failed to leave LAN Serf cluster: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Leave the Raft cluster
|
|
||||||
if s.raft != nil {
|
|
||||||
// Check if we have other raft nodes
|
|
||||||
peers, _ := s.raftPeers.Peers()
|
|
||||||
if len(peers) <= 1 {
|
|
||||||
s.logger.Printf("[WARN] consul: not leaving Raft cluster, no peers")
|
|
||||||
goto AFTER_LEAVE
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the leader
|
|
||||||
leader := s.raft.Leader()
|
|
||||||
if leader == nil {
|
|
||||||
s.logger.Printf("[ERR] consul: failed to leave Raft cluster: no leader")
|
|
||||||
goto AFTER_LEAVE
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request that we are removed
|
|
||||||
ch := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
var out struct{}
|
|
||||||
peer := s.raftTransport.LocalAddr().String()
|
|
||||||
// TODO: Correct version
|
|
||||||
err := s.connPool.RPC(leader, 1, "Raft.RemovePeer", peer, &out)
|
|
||||||
ch <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for the commit
|
|
||||||
select {
|
|
||||||
case err := <-ch:
|
|
||||||
// Ignore if we have already been deregistered by the leader
|
|
||||||
if err != nil && err.Error() != raft.ErrUnknownPeer.Error() {
|
|
||||||
s.logger.Printf("[ERR] consul: failed to leave Raft cluster: %v", err)
|
|
||||||
}
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
s.logger.Printf("[ERR] consul: timed out leaving Raft cluster")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
AFTER_LEAVE:
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user