202 lines
6.2 KiB
Go
Raw Normal View History

package autopilot
//
// The methods in this file are all mainly to provide synchronous methods
// for Raft operations that would normally return futures.
//
import (
"fmt"
"strconv"
"github.com/hashicorp/raft"
)
func requiredQuorum(voters int) int {
return (voters / 2) + 1
}
// NumVoters is a helper for calculating the number of voting peers in the
// current raft configuration. This function ignores any autopilot state
// and will make the calculation based on a newly retrieved Raft configuration.
func (a *Autopilot) NumVoters() (int, error) {
cfg, err := a.getRaftConfiguration()
if err != nil {
return 0, err
}
var numVoters int
for _, server := range cfg.Servers {
if server.Suffrage == raft.Voter {
numVoters++
}
}
return numVoters, nil
}
// AddServer is a helper for adding a new server to the raft configuration.
// This may remove servers with duplicate addresses or ids first and after
// its all done will trigger autopilot to remove dead servers if there
// are any. Servers added by this method will start in a non-voting
// state and later on autopilot will promote them to voting status
// if desired by the configured promoter. If too many removals would
// be required that would cause leadership loss then an error is returned
// instead of performing any Raft configuration changes.
func (a *Autopilot) AddServer(s *Server) error {
cfg, err := a.getRaftConfiguration()
if err != nil {
a.logger.Error("failed to get raft configuration", "error", err)
return err
}
var existingVoter bool
var voterRemovals []raft.ServerID
var nonVoterRemovals []raft.ServerID
var numVoters int
for _, server := range cfg.Servers {
if server.Suffrage == raft.Voter {
numVoters++
}
if server.Address == s.Address && server.ID == s.ID {
// nothing to be done as the addr and ID both already match
return nil
} else if server.ID == s.ID {
// special case for address updates only. In this case we should be
// able to update the configuration without have to first remove the server
if server.Suffrage == raft.Voter || server.Suffrage == raft.Staging {
existingVoter = true
}
} else if server.Address == s.Address {
if server.Suffrage == raft.Voter {
voterRemovals = append(voterRemovals, server.ID)
} else {
nonVoterRemovals = append(nonVoterRemovals, server.ID)
}
}
}
requiredVoters := requiredQuorum(numVoters)
if len(voterRemovals) > numVoters-requiredVoters {
return fmt.Errorf("Preventing server addition that would require removal of too many servers and cause cluster instability")
}
for _, id := range voterRemovals {
if err := a.removeServer(id); err != nil {
return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err)
}
a.logger.Info("removed server with duplicate address", "address", s.Address)
}
for _, id := range nonVoterRemovals {
if err := a.removeServer(id); err != nil {
return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err)
}
a.logger.Info("removed server with duplicate address", "address", s.Address)
}
if existingVoter {
if err := a.addVoter(s.ID, s.Address); err != nil {
return err
}
} else {
if err := a.addNonVoter(s.ID, s.Address); err != nil {
return err
}
}
// Trigger a check to remove dead servers
a.RemoveDeadServers()
return nil
}
// RemoveServer is a helper to remove a server from Raft if it
// exists in the latest Raft configuration
func (a *Autopilot) RemoveServer(id raft.ServerID) error {
cfg, err := a.getRaftConfiguration()
if err != nil {
a.logger.Error("failed to get raft configuration", "error", err)
return err
}
// only remove servers currently in the configuration
for _, server := range cfg.Servers {
if server.ID == id {
return a.removeServer(server.ID)
}
}
return nil
}
// addNonVoter is a wrapper around calling the AddNonVoter method on the Raft
// interface object provided to Autopilot
func (a *Autopilot) addNonVoter(id raft.ServerID, addr raft.ServerAddress) error {
addFuture := a.raft.AddNonvoter(id, addr, 0, 0)
if err := addFuture.Error(); err != nil {
a.logger.Error("failed to add raft non-voting peer", "id", id, "address", addr, "error", err)
return err
}
return nil
}
// addVoter is a wrapper around calling the AddVoter method on the Raft
// interface object provided to Autopilot
func (a *Autopilot) addVoter(id raft.ServerID, addr raft.ServerAddress) error {
addFuture := a.raft.AddVoter(id, addr, 0, 0)
if err := addFuture.Error(); err != nil {
a.logger.Error("failed to add raft voting peer", "id", id, "address", addr, "error", err)
return err
}
return nil
}
func (a *Autopilot) demoteVoter(id raft.ServerID) error {
removeFuture := a.raft.DemoteVoter(id, 0, 0)
if err := removeFuture.Error(); err != nil {
a.logger.Error("failed to demote raft peer", "id", id, "error", err)
return err
}
return nil
}
// removeServer is a wrapper around calling the RemoveServer method on the
// Raft interface object provided to Autopilot
func (a *Autopilot) removeServer(id raft.ServerID) error {
a.logger.Debug("removing server by ID", "id", id)
future := a.raft.RemoveServer(id, 0, 0)
if err := future.Error(); err != nil {
a.logger.Error("failed to remove raft server",
"id", id,
"error", err,
)
return err
}
a.logger.Info("removed server", "id", id)
return nil
}
// getRaftConfiguration a wrapper arond calling the GetConfiguration method
// on the Raft interface object provided to Autopilot
func (a *Autopilot) getRaftConfiguration() (*raft.Configuration, error) {
configFuture := a.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
return nil, err
}
cfg := configFuture.Configuration()
return &cfg, nil
}
// lastTerm will retrieve the raft stats and then pull the last term value out of it
func (a *Autopilot) lastTerm() (uint64, error) {
return strconv.ParseUint(a.raft.Stats()["last_log_term"], 10, 64)
}
// leadershipTransfer will transfer leadership to the server with the specified id and address
func (a *Autopilot) leadershipTransfer(id raft.ServerID, address raft.ServerAddress) error {
a.logger.Info("Transferring leadership to new server", "id", id, "address", address)
future := a.raft.LeadershipTransferToServer(id, address)
return future.Error()
}