From 225ae55e8305af3fb228272364120df87128dddc Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 14 Nov 2022 15:35:12 -0500 Subject: [PATCH] Leadership transfer cmd (#14132) * add leadership transfer command * add RPC call test (flaky) * add missing import * add changelog * add command registration * Apply suggestions from code review Co-authored-by: Matt Keeler * add the possibility of providing an id to raft leadership transfer. Add few tests. * delete old file from cherry pick * rename changelog filename to PR # * rename changelog and fix import * fix failing test * check for OperatorWrite Co-authored-by: Matt Keeler * rename from leader-transfer to transfer-leader * remove version check and add test for operator read * move struct to operator.go * first pass * add code for leader transfer in the grpc backend and tests * wire the http endpoint to the new grpc endpoint * remove the RPC endpoint * remove non needed struct * fix naming * add mog glue to API * fix comment * remove dead code * fix linter error * change package name for proto file * remove error wrapping * fix failing test * add command registration * add grpc service mock tests * fix receiver to be pointer * use defined values Co-authored-by: Matt Keeler * reuse MockAclAuthorizer * add documentation * remove usage of external.TokenFromContext * fix failing tests * fix proto generation * Apply suggestions from code review Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * Apply suggestions from code review * add more context in doc for the reason * Apply suggestions from docs code review Co-authored-by: Jeff Boruszak <104028618+boruszak@users.noreply.github.com> * regenerate proto * fix linter errors Co-authored-by: github-team-consul-core Co-authored-by: Matt Keeler Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> Co-authored-by: Jeff Boruszak <104028618+boruszak@users.noreply.github.com> --- .changelog/14132.txt | 3 + acl/MockAuthorizer.go | 223 ++++++++++++++++ acl/authorizer_test.go | 223 +--------------- acl/errors_test.go | 2 +- agent/agent.go | 4 + agent/consul/operator_backend.go | 34 +++ agent/consul/operator_backend_test.go | 193 ++++++++++++++ agent/consul/operator_raft_endpoint.go | 1 + agent/consul/server.go | 52 +++- agent/http_register.go | 1 + agent/operator_endpoint.go | 39 +++ agent/rpc/operator/service.go | 103 ++++++++ agent/rpc/operator/service_test.go | 104 ++++++++ api/operator_raft.go | 25 ++ api/operator_raft_test.go | 18 ++ .../raft/transferleader/transfer_leader.go | 90 +++++++ .../transferleader/transfer_leader_test.go | 43 ++++ command/registry.go | 2 + proto/pboperator/operator.gen.go | 18 ++ proto/pboperator/operator.pb.binary.go | 28 ++ proto/pboperator/operator.pb.go | 242 ++++++++++++++++++ proto/pboperator/operator.proto | 24 ++ proto/pboperator/operator_grpc.pb.go | 105 ++++++++ website/content/api-docs/operator/raft.mdx | 34 +++ website/content/commands/operator/raft.mdx | 23 ++ 25 files changed, 1397 insertions(+), 237 deletions(-) create mode 100644 .changelog/14132.txt create mode 100644 acl/MockAuthorizer.go create mode 100644 agent/consul/operator_backend.go create mode 100644 agent/consul/operator_backend_test.go create mode 100644 agent/rpc/operator/service.go create mode 100644 agent/rpc/operator/service_test.go create mode 100644 command/operator/raft/transferleader/transfer_leader.go create mode 100644 command/operator/raft/transferleader/transfer_leader_test.go create mode 100644 proto/pboperator/operator.gen.go create mode 100644 proto/pboperator/operator.pb.binary.go create mode 100644 proto/pboperator/operator.pb.go create mode 100644 proto/pboperator/operator.proto create mode 100644 proto/pboperator/operator_grpc.pb.go diff --git a/.changelog/14132.txt b/.changelog/14132.txt new file mode 100644 index 0000000000..7037f479d9 --- /dev/null +++ b/.changelog/14132.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +raft: add an operator api endpoint and a command to initiate raft leadership transfer. +``` diff --git a/acl/MockAuthorizer.go b/acl/MockAuthorizer.go new file mode 100644 index 0000000000..247cdb1151 --- /dev/null +++ b/acl/MockAuthorizer.go @@ -0,0 +1,223 @@ +package acl + +import "github.com/stretchr/testify/mock" + +type MockAuthorizer struct { + mock.Mock +} + +var _ Authorizer = (*MockAuthorizer)(nil) + +// ACLRead checks for permission to list all the ACLs +func (m *MockAuthorizer) ACLRead(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// ACLWrite checks for permission to manipulate ACLs +func (m *MockAuthorizer) ACLWrite(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// AgentRead checks for permission to read from agent endpoints for a +// given node. +func (m *MockAuthorizer) AgentRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// AgentWrite checks for permission to make changes via agent endpoints +// for a given node. +func (m *MockAuthorizer) AgentWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// EventRead determines if a specific event can be queried. +func (m *MockAuthorizer) EventRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// EventWrite determines if a specific event may be fired. +func (m *MockAuthorizer) EventWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// IntentionDefaultAllow determines the default authorized behavior +// when no intentions match a Connect request. +func (m *MockAuthorizer) IntentionDefaultAllow(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// IntentionRead determines if a specific intention can be read. +func (m *MockAuthorizer) IntentionRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// IntentionWrite determines if a specific intention can be +// created, modified, or deleted. +func (m *MockAuthorizer) IntentionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// KeyList checks for permission to list keys under a prefix +func (m *MockAuthorizer) KeyList(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// KeyRead checks for permission to read a given key +func (m *MockAuthorizer) KeyRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// KeyWrite checks for permission to write a given key +func (m *MockAuthorizer) KeyWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// KeyWritePrefix checks for permission to write to an +// entire key prefix. This means there must be no sub-policies +// that deny a write. +func (m *MockAuthorizer) KeyWritePrefix(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// KeyringRead determines if the encryption keyring used in +// the gossip layer can be read. +func (m *MockAuthorizer) KeyringRead(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// KeyringWrite determines if the keyring can be manipulated +func (m *MockAuthorizer) KeyringWrite(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// NodeRead checks for permission to read (discover) a given node. +func (m *MockAuthorizer) NodeRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +func (m *MockAuthorizer) NodeReadAll(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// NodeWrite checks for permission to create or update (register) a +// given node. +func (m *MockAuthorizer) NodeWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +func (m *MockAuthorizer) MeshRead(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +func (m *MockAuthorizer) MeshWrite(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// PeeringRead determines if the read-only Consul peering functions +// can be used. +func (m *MockAuthorizer) PeeringRead(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// PeeringWrite determines if the state-changing Consul peering +// functions can be used. +func (m *MockAuthorizer) PeeringWrite(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// OperatorRead determines if the read-only Consul operator functions +// can be used. ret := m.Called(segment, ctx) +func (m *MockAuthorizer) OperatorRead(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// OperatorWrite determines if the state-changing Consul operator +// functions can be used. +func (m *MockAuthorizer) OperatorWrite(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// PreparedQueryRead determines if a specific prepared query can be read +// to show its contents (this is not used for execution). +func (m *MockAuthorizer) PreparedQueryRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// PreparedQueryWrite determines if a specific prepared query can be +// created, modified, or deleted. +func (m *MockAuthorizer) PreparedQueryWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// ServiceRead checks for permission to read a given service +func (m *MockAuthorizer) ServiceRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +func (m *MockAuthorizer) ServiceReadAll(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// ServiceWrite checks for permission to create or update a given +// service +func (m *MockAuthorizer) ServiceWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// ServiceWriteAny checks for service:write on any service +func (m *MockAuthorizer) ServiceWriteAny(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +// SessionRead checks for permission to read sessions for a given node. +func (m *MockAuthorizer) SessionRead(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// SessionWrite checks for permission to create sessions for a given +// node. +func (m *MockAuthorizer) SessionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(segment, ctx) + return ret.Get(0).(EnforcementDecision) +} + +// Snapshot checks for permission to take and restore snapshots. +func (m *MockAuthorizer) Snapshot(ctx *AuthorizerContext) EnforcementDecision { + ret := m.Called(ctx) + return ret.Get(0).(EnforcementDecision) +} + +func (p *MockAuthorizer) ToAllowAuthorizer() AllowAuthorizer { + return AllowAuthorizer{Authorizer: p} +} diff --git a/acl/authorizer_test.go b/acl/authorizer_test.go index 03c0517a16..27a7aef4b9 100644 --- a/acl/authorizer_test.go +++ b/acl/authorizer_test.go @@ -4,230 +4,9 @@ import ( "fmt" "testing" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -type mockAuthorizer struct { - mock.Mock -} - -var _ Authorizer = (*mockAuthorizer)(nil) - -// ACLRead checks for permission to list all the ACLs -func (m *mockAuthorizer) ACLRead(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// ACLWrite checks for permission to manipulate ACLs -func (m *mockAuthorizer) ACLWrite(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// AgentRead checks for permission to read from agent endpoints for a -// given node. -func (m *mockAuthorizer) AgentRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// AgentWrite checks for permission to make changes via agent endpoints -// for a given node. -func (m *mockAuthorizer) AgentWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// EventRead determines if a specific event can be queried. -func (m *mockAuthorizer) EventRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// EventWrite determines if a specific event may be fired. -func (m *mockAuthorizer) EventWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// IntentionDefaultAllow determines the default authorized behavior -// when no intentions match a Connect request. -func (m *mockAuthorizer) IntentionDefaultAllow(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// IntentionRead determines if a specific intention can be read. -func (m *mockAuthorizer) IntentionRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// IntentionWrite determines if a specific intention can be -// created, modified, or deleted. -func (m *mockAuthorizer) IntentionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// KeyList checks for permission to list keys under a prefix -func (m *mockAuthorizer) KeyList(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// KeyRead checks for permission to read a given key -func (m *mockAuthorizer) KeyRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// KeyWrite checks for permission to write a given key -func (m *mockAuthorizer) KeyWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// KeyWritePrefix checks for permission to write to an -// entire key prefix. This means there must be no sub-policies -// that deny a write. -func (m *mockAuthorizer) KeyWritePrefix(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// KeyringRead determines if the encryption keyring used in -// the gossip layer can be read. -func (m *mockAuthorizer) KeyringRead(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// KeyringWrite determines if the keyring can be manipulated -func (m *mockAuthorizer) KeyringWrite(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// NodeRead checks for permission to read (discover) a given node. -func (m *mockAuthorizer) NodeRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -func (m *mockAuthorizer) NodeReadAll(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// NodeWrite checks for permission to create or update (register) a -// given node. -func (m *mockAuthorizer) NodeWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -func (m *mockAuthorizer) MeshRead(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -func (m *mockAuthorizer) MeshWrite(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// PeeringRead determines if the read-only Consul peering functions -// can be used. -func (m *mockAuthorizer) PeeringRead(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// PeeringWrite determines if the state-changing Consul peering -// functions can be used. -func (m *mockAuthorizer) PeeringWrite(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// OperatorRead determines if the read-only Consul operator functions -// can be used. ret := m.Called(segment, ctx) -func (m *mockAuthorizer) OperatorRead(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// OperatorWrite determines if the state-changing Consul operator -// functions can be used. -func (m *mockAuthorizer) OperatorWrite(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// PreparedQueryRead determines if a specific prepared query can be read -// to show its contents (this is not used for execution). -func (m *mockAuthorizer) PreparedQueryRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// PreparedQueryWrite determines if a specific prepared query can be -// created, modified, or deleted. -func (m *mockAuthorizer) PreparedQueryWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// ServiceRead checks for permission to read a given service -func (m *mockAuthorizer) ServiceRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -func (m *mockAuthorizer) ServiceReadAll(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// ServiceWrite checks for permission to create or update a given -// service -func (m *mockAuthorizer) ServiceWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// ServiceWriteAny checks for service:write on any service -func (m *mockAuthorizer) ServiceWriteAny(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -// SessionRead checks for permission to read sessions for a given node. -func (m *mockAuthorizer) SessionRead(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// SessionWrite checks for permission to create sessions for a given -// node. -func (m *mockAuthorizer) SessionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(segment, ctx) - return ret.Get(0).(EnforcementDecision) -} - -// Snapshot checks for permission to take and restore snapshots. -func (m *mockAuthorizer) Snapshot(ctx *AuthorizerContext) EnforcementDecision { - ret := m.Called(ctx) - return ret.Get(0).(EnforcementDecision) -} - -func (p *mockAuthorizer) ToAllowAuthorizer() AllowAuthorizer { - return AllowAuthorizer{Authorizer: p} -} - func TestACL_Enforce(t *testing.T) { type testCase struct { method string @@ -664,7 +443,7 @@ func TestACL_Enforce(t *testing.T) { for _, tcase := range cases { t.Run(testName(tcase), func(t *testing.T) { - m := &mockAuthorizer{} + m := &MockAuthorizer{} if tcase.err == "" { var nilCtx *AuthorizerContext diff --git a/acl/errors_test.go b/acl/errors_test.go index 7c651f1ec3..5910c08e9e 100644 --- a/acl/errors_test.go +++ b/acl/errors_test.go @@ -16,7 +16,7 @@ func TestPermissionDeniedError(t *testing.T) { return t.expected } - auth1 := mockAuthorizer{} + auth1 := MockAuthorizer{} cases := []testCase{ { diff --git a/agent/agent.go b/agent/agent.go index d4ea4e07d2..7c17502664 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "github.com/hashicorp/consul/proto/pboperator" "io" "net" "net/http" @@ -382,6 +383,8 @@ type Agent struct { rpcClientPeering pbpeering.PeeringServiceClient + rpcClientOperator pboperator.OperatorServiceClient + // routineManager is responsible for managing longer running go routines // run by the Agent routineManager *routine.Manager @@ -467,6 +470,7 @@ func New(bd BaseDeps) (*Agent, error) { } a.rpcClientPeering = pbpeering.NewPeeringServiceClient(conn) + a.rpcClientOperator = pboperator.NewOperatorServiceClient(conn) a.serviceManager = NewServiceManager(&a) diff --git a/agent/consul/operator_backend.go b/agent/consul/operator_backend.go new file mode 100644 index 0000000000..8305c8fd20 --- /dev/null +++ b/agent/consul/operator_backend.go @@ -0,0 +1,34 @@ +package consul + +import ( + "context" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/acl/resolver" + "github.com/hashicorp/consul/agent/rpc/operator" + "github.com/hashicorp/consul/proto/pboperator" + "github.com/hashicorp/raft" +) + +type OperatorBackend struct { + srv *Server +} + +// NewOperatorBackend returns a operator.Backend implementation that is bound to the given server. +func NewOperatorBackend(srv *Server) *OperatorBackend { + return &OperatorBackend{ + srv: srv, + } +} + +func (op *OperatorBackend) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) { + return op.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzCtx) +} + +func (op *OperatorBackend) TransferLeader(_ context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) { + reply := new(pboperator.TransferLeaderResponse) + err := op.srv.attemptLeadershipTransfer(raft.ServerID(request.ID)) + reply.Success = err == nil + return reply, err +} + +var _ operator.Backend = (*OperatorBackend)(nil) diff --git a/agent/consul/operator_backend_test.go b/agent/consul/operator_backend_test.go new file mode 100644 index 0000000000..3fdca15af9 --- /dev/null +++ b/agent/consul/operator_backend_test.go @@ -0,0 +1,193 @@ +package consul + +import ( + "context" + "github.com/hashicorp/consul/acl" + external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pboperator" + "github.com/hashicorp/consul/sdk/testutil/retry" + "google.golang.org/grpc/credentials/insecure" + "testing" + "time" + + "github.com/stretchr/testify/require" + gogrpc "google.golang.org/grpc" + + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/testrpc" +) + +func TestOperatorBackend_TransferLeader(t *testing.T) { + t.Parallel() + + conf := testClusterConfig{ + Datacenter: "dc1", + Servers: 3, + ServerConf: func(config *Config) { + config.RaftConfig.HeartbeatTimeout = 2 * time.Second + config.RaftConfig.ElectionTimeout = 2 * time.Second + config.RaftConfig.LeaderLeaseTimeout = 1 * time.Second + }, + } + + nodes := newTestCluster(t, &conf) + s1 := nodes.Servers[0] + // Make sure a leader is elected + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Make a write call to server2 and make sure it gets forwarded to server1 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + // Dial server2 directly + conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(), + gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + gogrpc.WithTransportCredentials(insecure.NewCredentials()), + gogrpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) + + operatorClient := pboperator.NewOperatorServiceClient(conn) + + testutil.RunStep(t, "transfer leader", func(t *testing.T) { + beforeLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, beforeLeader) + // Do the grpc Write call to server2 + req := pboperator.TransferLeaderRequest{ + ID: "", + } + reply, err := operatorClient.TransferLeader(ctx, &req) + require.NoError(t, err) + require.True(t, reply.Success) + time.Sleep(1 * time.Second) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + retry.Run(t, func(r *retry.R) { + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(r, afterLeader) + }) + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, afterLeader) + if afterLeader == beforeLeader { + t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader) + } + }) +} + +func TestOperatorBackend_TransferLeaderWithACL(t *testing.T) { + t.Parallel() + + conf := testClusterConfig{ + Datacenter: "dc1", + Servers: 3, + ServerConf: func(config *Config) { + config.RaftConfig.HeartbeatTimeout = 2 * time.Second + config.RaftConfig.ElectionTimeout = 2 * time.Second + config.RaftConfig.LeaderLeaseTimeout = 1 * time.Second + config.ACLsEnabled = true + config.ACLInitialManagementToken = "root" + config.ACLResolverSettings.ACLDefaultPolicy = "deny" + }, + } + + nodes := newTestCluster(t, &conf) + s1 := nodes.Servers[0] + // Make sure a leader is elected + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Make a write call to server2 and make sure it gets forwarded to server1 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + // Dial server2 directly + conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(), + gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + gogrpc.WithTransportCredentials(insecure.NewCredentials()), + gogrpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) + + operatorClient := pboperator.NewOperatorServiceClient(conn) + + testutil.RunStep(t, "transfer leader no token", func(t *testing.T) { + beforeLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, beforeLeader) + // Do the grpc Write call to server2 + req := pboperator.TransferLeaderRequest{ + ID: "", + } + reply, err := operatorClient.TransferLeader(ctx, &req) + require.True(t, acl.IsErrPermissionDenied(err)) + require.Nil(t, reply) + time.Sleep(1 * time.Second) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + retry.Run(t, func(r *retry.R) { + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(r, afterLeader) + }) + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, afterLeader) + if afterLeader != beforeLeader { + t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader) + } + }) + + testutil.RunStep(t, "transfer leader operator read token", func(t *testing.T) { + + beforeLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, beforeLeader) + // Do the grpc Write call to server2 + req := pboperator.TransferLeaderRequest{ + ID: "", + } + codec := rpcClient(t, s1) + rules := `operator = "read"` + tokenRead := createToken(t, codec, rules) + + ctxToken, err := external.ContextWithQueryOptions(ctx, structs.QueryOptions{Token: tokenRead}) + require.NoError(t, err) + reply, err := operatorClient.TransferLeader(ctxToken, &req) + require.True(t, acl.IsErrPermissionDenied(err)) + require.Nil(t, reply) + time.Sleep(1 * time.Second) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + retry.Run(t, func(r *retry.R) { + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(r, afterLeader) + }) + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, afterLeader) + if afterLeader != beforeLeader { + t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader) + } + }) + + testutil.RunStep(t, "transfer leader operator write token", func(t *testing.T) { + + beforeLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, beforeLeader) + // Do the grpc Write call to server2 + req := pboperator.TransferLeaderRequest{ + ID: "", + } + codec := rpcClient(t, s1) + rules := `operator = "write"` + tokenWrite := createTokenWithPolicyNameFull(t, codec, "the-policy-write", rules, "root") + ctxToken, err := external.ContextWithQueryOptions(ctx, structs.QueryOptions{Token: tokenWrite.SecretID}) + require.NoError(t, err) + reply, err := operatorClient.TransferLeader(ctxToken, &req) + require.NoError(t, err) + require.True(t, reply.Success) + time.Sleep(1 * time.Second) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + retry.Run(t, func(r *retry.R) { + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(r, afterLeader) + }) + afterLeader, _ := s1.raft.LeaderWithID() + require.NotEmpty(t, afterLeader) + if afterLeader == beforeLeader { + t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader) + } + }) +} diff --git a/agent/consul/operator_raft_endpoint.go b/agent/consul/operator_raft_endpoint.go index 328f8ff964..a0c194e7ae 100644 --- a/agent/consul/operator_raft_endpoint.go +++ b/agent/consul/operator_raft_endpoint.go @@ -2,6 +2,7 @@ package consul import ( "fmt" + "net" "github.com/hashicorp/raft" diff --git a/agent/consul/server.go b/agent/consul/server.go index bc57f29443..24cf98e563 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -21,7 +21,6 @@ import ( "github.com/hashicorp/go-connlimit" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-version" "github.com/hashicorp/raft" autopilot "github.com/hashicorp/raft-autopilot" raftboltdb "github.com/hashicorp/raft-boltdb/v2" @@ -51,6 +50,7 @@ import ( "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/rpc/middleware" + "github.com/hashicorp/consul/agent/rpc/operator" "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" @@ -370,6 +370,9 @@ type Server struct { // peeringBackend is shared between the external and internal gRPC services for peering peeringBackend *PeeringBackend + // operatorBackend is shared between the external and internal gRPC services for peering + operatorBackend *OperatorBackend + // peerStreamServer is a server used to handle peering streams from external clusters. peerStreamServer *peerstream.Server @@ -385,6 +388,7 @@ type Server struct { // embedded struct to hold all the enterprise specific data EnterpriseServer + operatorServer *operator.Server } type connHandler interface { Run() error @@ -739,6 +743,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser }).Register(s.externalGRPCServer) s.peeringBackend = NewPeeringBackend(s) + s.operatorBackend = NewOperatorBackend(s) s.peerStreamServer = peerstream.NewServer(peerstream.Config{ Backend: s.peeringBackend, GetStore: func() peerstream.StateStore { return s.FSM().State() }, @@ -826,6 +831,19 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler PeeringEnabled: config.PeeringEnabled, }) s.peeringServer = p + o := operator.NewServer(operator.Config{ + Backend: s.operatorBackend, + Logger: deps.Logger.Named("grpc-api.operator"), + ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) { + // Only forward the request if the dc in the request matches the server's datacenter. + if info.RequestDatacenter() != "" && info.RequestDatacenter() != config.Datacenter { + return false, fmt.Errorf("requests to transfer leader cannot be forwarded to remote datacenters") + } + return s.ForwardGRPC(s.grpcConnPool, info, fn) + }, + Datacenter: config.Datacenter, + }) + s.operatorServer = o register := func(srv *grpc.Server) { if config.RPCConfig.EnableStreaming { @@ -834,6 +852,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler deps.Logger.Named("grpc-api.subscription"))) } s.peeringServer.Register(srv) + s.operatorServer.Register(srv) s.registerEnterpriseGRPCServices(deps, srv) // Note: these external gRPC services are also exposed on the internal server to @@ -1193,20 +1212,25 @@ func (s *Server) Shutdown() error { return nil } -func (s *Server) attemptLeadershipTransfer() (success bool) { - leadershipTransferVersion := version.Must(version.NewVersion(LeaderTransferMinVersion)) - - ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, leadershipTransferVersion) - if !ok { - return false +func (s *Server) attemptLeadershipTransfer(id raft.ServerID) (err error) { + var addr raft.ServerAddress + if id != "" { + addr, err = s.serverLookup.ServerAddr(id) + if err != nil { + return err + } + future := s.raft.LeadershipTransferToServer(id, addr) + if err := future.Error(); err != nil { + return err + } + } else { + future := s.raft.LeadershipTransfer() + if err := future.Error(); err != nil { + return err + } } - future := s.raft.LeadershipTransfer() - if err := future.Error(); err != nil { - s.logger.Error("failed to transfer leadership, removing the server", "error", err) - return false - } - return true + return nil } // Leave is used to prepare for a graceful shutdown. @@ -1228,7 +1252,7 @@ func (s *Server) Leave() error { // removed for some reasonable period of time. isLeader := s.IsLeader() if isLeader && numPeers > 1 { - if s.attemptLeadershipTransfer() { + if err := s.attemptLeadershipTransfer(""); err == nil { isLeader = false } else { future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0) diff --git a/agent/http_register.go b/agent/http_register.go index 6dd8b41c60..caa55d5ca4 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -99,6 +99,7 @@ func init() { registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPHandlers).ACLAuthorize) registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPHandlers).KVSEndpoint) registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPHandlers).OperatorRaftConfiguration) + registerEndpoint("/v1/operator/raft/transfer-leader", []string{"POST"}, (*HTTPHandlers).OperatorRaftTransferLeader) registerEndpoint("/v1/operator/raft/peer", []string{"DELETE"}, (*HTTPHandlers).OperatorRaftPeer) registerEndpoint("/v1/operator/keyring", []string{"GET", "POST", "PUT", "DELETE"}, (*HTTPHandlers).OperatorKeyringEndpoint) registerEndpoint("/v1/operator/autopilot/configuration", []string{"GET", "PUT"}, (*HTTPHandlers).OperatorAutopilotConfiguration) diff --git a/agent/operator_endpoint.go b/agent/operator_endpoint.go index 851ef52e1c..10af5e31d0 100644 --- a/agent/operator_endpoint.go +++ b/agent/operator_endpoint.go @@ -2,6 +2,8 @@ package agent import ( "fmt" + external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/proto/pboperator" "net/http" "strconv" "time" @@ -31,6 +33,43 @@ func (s *HTTPHandlers) OperatorRaftConfiguration(resp http.ResponseWriter, req * return reply, nil } +// OperatorRaftTransferLeader is used to transfer raft cluster leadership to another node +func (s *HTTPHandlers) OperatorRaftTransferLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + + var entMeta acl.EnterpriseMeta + if err := s.parseEntMetaPartition(req, &entMeta); err != nil { + return nil, err + } + + params := req.URL.Query() + _, hasID := params["id"] + ID := "" + if hasID { + ID = params.Get("id") + } + args := pboperator.TransferLeaderRequest{ + ID: ID, + } + + var token string + s.parseToken(req, &token) + ctx, err := external.ContextWithQueryOptions(req.Context(), structs.QueryOptions{Token: token}) + if err != nil { + return nil, err + } + + result, err := s.agent.rpcClientOperator.TransferLeader(ctx, &args) + if err != nil { + return nil, err + } + if result.Success != true { + return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("Failed to transfer Leader: %s", err.Error())} + } + reply := new(api.TransferLeaderResponse) + pboperator.TransferLeaderResponseToAPI(result, reply) + return reply, nil +} + // OperatorRaftPeer supports actions on Raft peers. Currently we only support // removing peers by address. func (s *HTTPHandlers) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/agent/rpc/operator/service.go b/agent/rpc/operator/service.go new file mode 100644 index 0000000000..cbe876a7f1 --- /dev/null +++ b/agent/rpc/operator/service.go @@ -0,0 +1,103 @@ +package operator + +import ( + "context" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/acl/resolver" + external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pboperator" + "github.com/hashicorp/go-hclog" + "google.golang.org/grpc" +) + +// For private/internal gRPC handlers, protoc-gen-rpc-glue generates the +// requisite methods to satisfy the structs.RPCInfo interface using fields +// from the pbcommon package. This service is public, so we can't use those +// fields in our proto definition. Instead, we construct our RPCInfo manually. +var writeRequest struct { + structs.WriteRequest + structs.DCSpecificRequest +} + +var readRequest struct { + structs.QueryOptions + structs.DCSpecificRequest +} + +// Server implements pboperator.OperatorService to provide RPC operations for +// managing operator operation. +type Server struct { + Config +} + +func (s *Server) TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) { + resp := &pboperator.TransferLeaderResponse{Success: false} + handled, err := s.ForwardRPC(&writeRequest, func(conn *grpc.ClientConn) error { + ctx := external.ForwardMetadataContext(ctx) + var err error + resp, err = pboperator.NewOperatorServiceClient(conn).TransferLeader(ctx, request) + return err + }) + if handled || err != nil { + return resp, err + } + + var authzCtx acl.AuthorizerContext + entMeta := structs.DefaultEnterpriseMetaInDefaultPartition() + + options, err := external.QueryOptionsFromContext(ctx) + if err != nil { + return nil, err + } + + authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx) + if err != nil { + return resp, err + } + + if err := authz.ToAllowAuthorizer().OperatorWriteAllowed(&authzCtx); err != nil { + return resp, err + } + + return s.Backend.TransferLeader(ctx, request) +} + +type Config struct { + Backend Backend + Logger hclog.Logger + ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) + Datacenter string +} + +func NewServer(cfg Config) *Server { + requireNotNil(cfg.Backend, "Backend") + requireNotNil(cfg.Logger, "Logger") + requireNotNil(cfg.ForwardRPC, "ForwardRPC") + if cfg.Datacenter == "" { + panic("Datacenter is required") + } + return &Server{ + Config: cfg, + } +} + +func requireNotNil(v interface{}, name string) { + if v == nil { + panic(name + " is required") + } +} + +var _ pboperator.OperatorServiceServer = (*Server)(nil) + +func (s *Server) Register(grpcServer *grpc.Server) { + pboperator.RegisterOperatorServiceServer(grpcServer, s) +} + +// Backend defines the core integrations the Operator endpoint depends on. A +// functional implementation will integrate with various operator operation such as +// raft, autopilot operation. The only currently implemented operation is raft leader transfer +type Backend interface { + TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) + ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) +} diff --git a/agent/rpc/operator/service_test.go b/agent/rpc/operator/service_test.go new file mode 100644 index 0000000000..7686c43241 --- /dev/null +++ b/agent/rpc/operator/service_test.go @@ -0,0 +1,104 @@ +package operator + +import ( + "context" + "fmt" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/acl/resolver" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pboperator" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc" + "testing" + + "github.com/stretchr/testify/require" +) + +type MockBackend struct { + mock.Mock + authorizer acl.Authorizer +} + +func (m *MockBackend) TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) { + called := m.Called(ctx, request) + ret := called.Get(0) + if ret == nil { + return nil, called.Error(1) + } + return ret.(*pboperator.TransferLeaderResponse), called.Error(1) +} + +func (m *MockBackend) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) { + return resolver.Result{Authorizer: m.authorizer}, nil +} + +func TestLeaderTransfer_ACL_Deny(t *testing.T) { + authorizer := acl.MockAuthorizer{} + authorizer.On("OperatorWrite", mock.Anything).Return(acl.Deny) + server := NewServer(Config{Datacenter: "dc1", Backend: &MockBackend{authorizer: &authorizer}, Logger: hclog.New(nil), ForwardRPC: doForwardRPC}) + + _, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{}) + require.Error(t, err) + require.Equal(t, "Permission denied: provided token lacks permission 'operator:write'", err.Error()) +} + +func TestLeaderTransfer_ACL_Allowed(t *testing.T) { + authorizer := &acl.MockAuthorizer{} + authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow) + + backend := &MockBackend{authorizer: authorizer} + backend.On("TransferLeader", mock.Anything, mock.Anything).Return(nil, nil) + server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC}) + + _, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{}) + require.NoError(t, err) +} + +func TestLeaderTransfer_LeaderTransfer_Fail(t *testing.T) { + authorizer := &acl.MockAuthorizer{} + authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow) + + backend := &MockBackend{authorizer: authorizer} + backend.On("TransferLeader", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("test")) + server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC}) + + _, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{}) + require.Error(t, err) + require.Equal(t, "test", err.Error()) +} + +func TestLeaderTransfer_LeaderTransfer_Success(t *testing.T) { + authorizer := &acl.MockAuthorizer{} + authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow) + + backend := &MockBackend{authorizer: authorizer} + backend.On("TransferLeader", mock.Anything, mock.Anything).Return(&pboperator.TransferLeaderResponse{Success: true}, nil) + server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC}) + + ret, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{}) + require.NoError(t, err) + require.NotNil(t, ret) + require.True(t, ret.Success) +} + +func TestLeaderTransfer_LeaderTransfer_ForwardRPC(t *testing.T) { + authorizer := &acl.MockAuthorizer{} + authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow) + + backend := &MockBackend{authorizer: authorizer} + backend.On("TransferLeader", mock.Anything, mock.Anything).Return(&pboperator.TransferLeaderResponse{}, nil) + server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: noopForwardRPC}) + + ret, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{}) + require.NoError(t, err) + require.NotNil(t, ret) + require.False(t, ret.Success) +} +func noopForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) { + return true, nil +} + +func doForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) { + return false, nil +} diff --git a/api/operator_raft.go b/api/operator_raft.go index 1b48fdcd9b..1da20e899f 100644 --- a/api/operator_raft.go +++ b/api/operator_raft.go @@ -36,6 +36,11 @@ type RaftConfiguration struct { Index uint64 } +// TransferLeaderResponse is returned when querying for the current Raft configuration. +type TransferLeaderResponse struct { + Success bool +} + // RaftGetConfiguration is used to query the current Raft peer set. func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { r := op.c.newRequest("GET", "/v1/operator/raft/configuration") @@ -56,6 +61,26 @@ func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, e return &out, nil } +// RaftLeaderTransfer is used to transfer the current raft leader to another node +func (op *Operator) RaftLeaderTransfer(q *QueryOptions) (*TransferLeaderResponse, error) { + r := op.c.newRequest("POST", "/v1/operator/raft/transfer-leader") + r.setQueryOptions(q) + _, resp, err := op.c.doRequest(r) + if err != nil { + return nil, err + } + defer closeResponseBody(resp) + if err := requireOK(resp); err != nil { + return nil, err + } + + var out TransferLeaderResponse + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + return &out, nil +} + // RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft // quorum but no longer known to Serf or the catalog) by address in the form of // "IP:port". diff --git a/api/operator_raft_test.go b/api/operator_raft_test.go index a6eada42cd..ecefaa9719 100644 --- a/api/operator_raft_test.go +++ b/api/operator_raft_test.go @@ -36,3 +36,21 @@ func TestAPI_OperatorRaftRemovePeerByAddress(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestAPI_OperatorRaftLeaderTransfer(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + // If we get this error, it proves we sent the address all the way + // through. + operator := c.Operator() + transfer, err := operator.RaftLeaderTransfer(nil) + if err == nil || !strings.Contains(err.Error(), + "cannot find peer") { + t.Fatalf("err: %v", err) + } + if transfer != nil { + t.Fatalf("err:%v", transfer) + } +} diff --git a/command/operator/raft/transferleader/transfer_leader.go b/command/operator/raft/transferleader/transfer_leader.go new file mode 100644 index 0000000000..b7d8c468eb --- /dev/null +++ b/command/operator/raft/transferleader/transfer_leader.go @@ -0,0 +1,90 @@ +package transferleader + +import ( + "flag" + "fmt" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/flags" + "github.com/mitchellh/cli" +) + +func New(ui cli.Ui) *cmd { + c := &cmd{UI: ui} + c.init() + return c +} + +type cmd struct { + UI cli.Ui + flags *flag.FlagSet + http *flags.HTTPFlags + help string + id string +} + +func (c *cmd) init() { + c.flags = flag.NewFlagSet("", flag.ContinueOnError) + c.http = &flags.HTTPFlags{} + c.flags.StringVar(&c.id, "id", "", + "The ID to remove from the Raft configuration.") + flags.Merge(c.flags, c.http.ClientFlags()) + flags.Merge(c.flags, c.http.ServerFlags()) + c.help = flags.Usage(help, c.flags) +} + +func (c *cmd) Run(args []string) int { + if err := c.flags.Parse(args); err != nil { + if err == flag.ErrHelp { + return 0 + } + c.UI.Error(fmt.Sprintf("Failed to parse args: %v", err)) + return 1 + } + + // Set up a client. + client, err := c.http.APIClient() + if err != nil { + c.UI.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Fetch the current configuration. + result, err := raftTransferLeader(client, c.http.Stale()) + if err != nil { + c.UI.Error(fmt.Sprintf("Error transfering leadership: %v", err)) + return 1 + } + + c.UI.Output(result) + return 0 +} + +func raftTransferLeader(client *api.Client, stale bool) (string, error) { + q := &api.QueryOptions{ + AllowStale: stale, + } + reply, err := client.Operator().RaftLeaderTransfer(q) + if err != nil { + return "", fmt.Errorf("Failed to transfer leadership %w", err) + } + if !reply.Success { + return "", fmt.Errorf("Failed to transfer leadership") + } + return "Success", nil +} + +func (c *cmd) Synopsis() string { + return synopsis +} + +func (c *cmd) Help() string { + return c.help +} + +const synopsis = "Transfer raft leadership to another node" +const help = ` +Usage: consul operator raft transfer-leader [options] + + Transfer raft leadership to another node. +` diff --git a/command/operator/raft/transferleader/transfer_leader_test.go b/command/operator/raft/transferleader/transfer_leader_test.go new file mode 100644 index 0000000000..b597971198 --- /dev/null +++ b/command/operator/raft/transferleader/transfer_leader_test.go @@ -0,0 +1,43 @@ +package transferleader + +import ( + "github.com/hashicorp/consul/agent" + "github.com/mitchellh/cli" + "strings" + "testing" +) + +func TestOperatorRaftTransferLeaderCommand_noTabs(t *testing.T) { + t.Parallel() + if strings.ContainsRune(New(cli.NewMockUi()).Help(), '\t') { + t.Fatal("help has tabs") + } +} + +// This only test that the command behave correctly when only one agent is present +// and no leadership transfer is possible, testing for the functionality will be done at the RPC level. +func TestOperatorRaftTransferLeaderWithSingleNode(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := agent.NewTestAgent(t, ``) + defer a.Shutdown() + + expected := "cannot find peer" + + // Test the transfer-leader subcommand directly + ui := cli.NewMockUi() + c := New(ui) + + args := []string{"-http-addr=" + a.HTTPAddr()} + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + output := strings.TrimSpace(ui.ErrorWriter.String()) + if !strings.Contains(output, expected) { + t.Fatalf("bad: %q, %q", output, expected) + } +} diff --git a/command/registry.go b/command/registry.go index b35ac2e424..1d34f746ca 100644 --- a/command/registry.go +++ b/command/registry.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/hashicorp/consul/command/operator/raft/transferleader" "os" "os/signal" "syscall" @@ -220,6 +221,7 @@ func RegisteredCommands(ui cli.Ui) map[string]mcli.CommandFactory { entry{"operator raft", func(cli.Ui) (cli.Command, error) { return operraft.New(), nil }}, entry{"operator raft list-peers", func(ui cli.Ui) (cli.Command, error) { return operraftlist.New(ui), nil }}, entry{"operator raft remove-peer", func(ui cli.Ui) (cli.Command, error) { return operraftremove.New(ui), nil }}, + entry{"operator raft transfer-leader", func(ui cli.Ui) (cli.Command, error) { return transferleader.New(ui), nil }}, entry{"peering", func(cli.Ui) (cli.Command, error) { return peering.New(), nil }}, entry{"peering delete", func(ui cli.Ui) (cli.Command, error) { return peerdelete.New(ui), nil }}, entry{"peering generate-token", func(ui cli.Ui) (cli.Command, error) { return peergenerate.New(ui), nil }}, diff --git a/proto/pboperator/operator.gen.go b/proto/pboperator/operator.gen.go new file mode 100644 index 0000000000..83bd446197 --- /dev/null +++ b/proto/pboperator/operator.gen.go @@ -0,0 +1,18 @@ +// Code generated by mog. DO NOT EDIT. + +package pboperator + +import "github.com/hashicorp/consul/api" + +func TransferLeaderResponseToAPI(s *TransferLeaderResponse, t *api.TransferLeaderResponse) { + if s == nil { + return + } + t.Success = s.Success +} +func TransferLeaderResponseFromAPI(t *api.TransferLeaderResponse, s *TransferLeaderResponse) { + if s == nil { + return + } + s.Success = t.Success +} diff --git a/proto/pboperator/operator.pb.binary.go b/proto/pboperator/operator.pb.binary.go new file mode 100644 index 0000000000..594bcde67f --- /dev/null +++ b/proto/pboperator/operator.pb.binary.go @@ -0,0 +1,28 @@ +// Code generated by protoc-gen-go-binary. DO NOT EDIT. +// source: proto/pboperator/operator.proto + +package pboperator + +import ( + "github.com/golang/protobuf/proto" +) + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *TransferLeaderRequest) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *TransferLeaderRequest) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *TransferLeaderResponse) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *TransferLeaderResponse) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} diff --git a/proto/pboperator/operator.pb.go b/proto/pboperator/operator.pb.go new file mode 100644 index 0000000000..317822eec5 --- /dev/null +++ b/proto/pboperator/operator.pb.go @@ -0,0 +1,242 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc (unknown) +// source: proto/pboperator/operator.proto + +package pboperator + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type TransferLeaderRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"` +} + +func (x *TransferLeaderRequest) Reset() { + *x = TransferLeaderRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_pboperator_operator_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TransferLeaderRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TransferLeaderRequest) ProtoMessage() {} + +func (x *TransferLeaderRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_pboperator_operator_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TransferLeaderRequest.ProtoReflect.Descriptor instead. +func (*TransferLeaderRequest) Descriptor() ([]byte, []int) { + return file_proto_pboperator_operator_proto_rawDescGZIP(), []int{0} +} + +func (x *TransferLeaderRequest) GetID() string { + if x != nil { + return x.ID + } + return "" +} + +// mog annotation: +// +// target=github.com/hashicorp/consul/api.TransferLeaderResponse +// output=operator.gen.go +// name=API +type TransferLeaderResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // true if the transfer is a success + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` +} + +func (x *TransferLeaderResponse) Reset() { + *x = TransferLeaderResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_pboperator_operator_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TransferLeaderResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TransferLeaderResponse) ProtoMessage() {} + +func (x *TransferLeaderResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_pboperator_operator_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TransferLeaderResponse.ProtoReflect.Descriptor instead. +func (*TransferLeaderResponse) Descriptor() ([]byte, []int) { + return file_proto_pboperator_operator_proto_rawDescGZIP(), []int{1} +} + +func (x *TransferLeaderResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +var File_proto_pboperator_operator_proto protoreflect.FileDescriptor + +var file_proto_pboperator_operator_proto_rawDesc = []byte{ + 0x0a, 0x1f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, + 0x6f, 0x72, 0x2f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x22, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, + 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65, + 0x72, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x27, 0x0a, 0x15, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, + 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, + 0x0a, 0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x22, 0x32, + 0x0a, 0x16, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x32, 0x9d, 0x01, 0x0a, 0x0f, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x39, 0x2e, 0x68, 0x61, 0x73, 0x68, + 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x3a, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, + 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, + 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x42, 0x91, 0x02, 0x0a, 0x26, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, + 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x42, 0x0d, 0x4f, + 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2c, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, + 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x70, 0x62, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xa2, 0x02, 0x04, 0x48, + 0x43, 0x49, 0x4f, 0xaa, 0x02, 0x22, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, + 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, + 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xca, 0x02, 0x22, 0x48, 0x61, 0x73, 0x68, 0x69, + 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xe2, 0x02, 0x2e, + 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, + 0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, + 0x6f, 0x72, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, + 0x25, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73, + 0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x3a, 0x3a, 0x4f, 0x70, + 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_pboperator_operator_proto_rawDescOnce sync.Once + file_proto_pboperator_operator_proto_rawDescData = file_proto_pboperator_operator_proto_rawDesc +) + +func file_proto_pboperator_operator_proto_rawDescGZIP() []byte { + file_proto_pboperator_operator_proto_rawDescOnce.Do(func() { + file_proto_pboperator_operator_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_pboperator_operator_proto_rawDescData) + }) + return file_proto_pboperator_operator_proto_rawDescData +} + +var file_proto_pboperator_operator_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_pboperator_operator_proto_goTypes = []interface{}{ + (*TransferLeaderRequest)(nil), // 0: hashicorp.consul.internal.operator.TransferLeaderRequest + (*TransferLeaderResponse)(nil), // 1: hashicorp.consul.internal.operator.TransferLeaderResponse +} +var file_proto_pboperator_operator_proto_depIdxs = []int32{ + 0, // 0: hashicorp.consul.internal.operator.OperatorService.TransferLeader:input_type -> hashicorp.consul.internal.operator.TransferLeaderRequest + 1, // 1: hashicorp.consul.internal.operator.OperatorService.TransferLeader:output_type -> hashicorp.consul.internal.operator.TransferLeaderResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_pboperator_operator_proto_init() } +func file_proto_pboperator_operator_proto_init() { + if File_proto_pboperator_operator_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_pboperator_operator_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TransferLeaderRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_pboperator_operator_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TransferLeaderResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_pboperator_operator_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_pboperator_operator_proto_goTypes, + DependencyIndexes: file_proto_pboperator_operator_proto_depIdxs, + MessageInfos: file_proto_pboperator_operator_proto_msgTypes, + }.Build() + File_proto_pboperator_operator_proto = out.File + file_proto_pboperator_operator_proto_rawDesc = nil + file_proto_pboperator_operator_proto_goTypes = nil + file_proto_pboperator_operator_proto_depIdxs = nil +} diff --git a/proto/pboperator/operator.proto b/proto/pboperator/operator.proto new file mode 100644 index 0000000000..71d0a6caf3 --- /dev/null +++ b/proto/pboperator/operator.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package hashicorp.consul.internal.operator; + +// Operator defines a set of operators operation applicable to Consul +service OperatorService { + //Transfer raft leadership to another node + rpc TransferLeader(TransferLeaderRequest) returns (TransferLeaderResponse) {} +} + +message TransferLeaderRequest { + string ID = 1; +} + +// +// mog annotation: +// +// target=github.com/hashicorp/consul/api.TransferLeaderResponse +// output=operator.gen.go +// name=API +message TransferLeaderResponse { + // true if the transfer is a success + bool success = 1; +} diff --git a/proto/pboperator/operator_grpc.pb.go b/proto/pboperator/operator_grpc.pb.go new file mode 100644 index 0000000000..b8dac9336c --- /dev/null +++ b/proto/pboperator/operator_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc (unknown) +// source: proto/pboperator/operator.proto + +package pboperator + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// OperatorServiceClient is the client API for OperatorService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type OperatorServiceClient interface { + // Transfer raft leadership to another node + TransferLeader(ctx context.Context, in *TransferLeaderRequest, opts ...grpc.CallOption) (*TransferLeaderResponse, error) +} + +type operatorServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewOperatorServiceClient(cc grpc.ClientConnInterface) OperatorServiceClient { + return &operatorServiceClient{cc} +} + +func (c *operatorServiceClient) TransferLeader(ctx context.Context, in *TransferLeaderRequest, opts ...grpc.CallOption) (*TransferLeaderResponse, error) { + out := new(TransferLeaderResponse) + err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.operator.OperatorService/TransferLeader", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// OperatorServiceServer is the server API for OperatorService service. +// All implementations should embed UnimplementedOperatorServiceServer +// for forward compatibility +type OperatorServiceServer interface { + // Transfer raft leadership to another node + TransferLeader(context.Context, *TransferLeaderRequest) (*TransferLeaderResponse, error) +} + +// UnimplementedOperatorServiceServer should be embedded to have forward compatible implementations. +type UnimplementedOperatorServiceServer struct { +} + +func (UnimplementedOperatorServiceServer) TransferLeader(context.Context, *TransferLeaderRequest) (*TransferLeaderResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransferLeader not implemented") +} + +// UnsafeOperatorServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to OperatorServiceServer will +// result in compilation errors. +type UnsafeOperatorServiceServer interface { + mustEmbedUnimplementedOperatorServiceServer() +} + +func RegisterOperatorServiceServer(s grpc.ServiceRegistrar, srv OperatorServiceServer) { + s.RegisterService(&OperatorService_ServiceDesc, srv) +} + +func _OperatorService_TransferLeader_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TransferLeaderRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OperatorServiceServer).TransferLeader(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/hashicorp.consul.internal.operator.OperatorService/TransferLeader", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OperatorServiceServer).TransferLeader(ctx, req.(*TransferLeaderRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// OperatorService_ServiceDesc is the grpc.ServiceDesc for OperatorService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var OperatorService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "hashicorp.consul.internal.operator.OperatorService", + HandlerType: (*OperatorServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "TransferLeader", + Handler: _OperatorService_TransferLeader_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "proto/pboperator/operator.proto", +} diff --git a/website/content/api-docs/operator/raft.mdx b/website/content/api-docs/operator/raft.mdx index 1125a2c624..e22321a182 100644 --- a/website/content/api-docs/operator/raft.mdx +++ b/website/content/api-docs/operator/raft.mdx @@ -145,3 +145,37 @@ $ curl \ --request DELETE \ "http://127.0.0.1:8500/v1/operator/raft/peer?address=1.2.3.4:5678" ``` + +## Transfer Raft Leadership + +This endpoint transfers the Raft leadership from the current leader to a different Raft peer. +The new leader is selected at random unless explicitly specified with the `id` parameter. + +| Method | Path | Produces | +| -------- | ------------------------------- | ------------------ | +| `POST` | `/operator/raft/transfer-leader` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/api-docs/features/blocking), +[consistency modes](/api-docs/features/consistency), +[agent caching](/api-docs/features/caching), and +[required ACLs](/api-docs#authentication). + +| Blocking Queries | Consistency Modes | Agent Caching | ACL Required | +| ---------------- | ----------------- | ------------- | ---------------- | +| `NO` | `none` | `none` | `operator:write` | + +The corresponding CLI command is [`consul operator raft transfer-leader`](/commands/operator/raft#transfer-leader). + +### Query Parameters + +- `id` `(string: "")` - Specifies the node ID of the Raft peer to transfer leadership to. +If empty, leadership transfers to a random server agent. + +### Sample Request + +```shell-session +$ curl \ + --request POST \ + "http://127.0.0.1:8500/v1/operator/raft/transfer-leader?id=09cfc046-e74a-ad49-1aad-c2161b7fe677" +``` diff --git a/website/content/commands/operator/raft.mdx b/website/content/commands/operator/raft.mdx index b6a10dab6e..aa9d9ffc93 100644 --- a/website/content/commands/operator/raft.mdx +++ b/website/content/commands/operator/raft.mdx @@ -104,3 +104,26 @@ Usage: `consul operator raft remove-peer -address="IP:port"` - `-id` - ID of the server to remove. The return code will indicate success or failure. + +## transfer-leader + +Corresponding HTTP API Endpoint: [\[POST\] /v1/operator/raft/transfer-leader](/api-docs/operator/raft#transfer-raft-leadership) + +This command transfers Raft leadership to another server agent. If an `id` is provided, Consul transfers leadership to the server with that id. + +Use this command to change leadership without restarting the leader node, which maintains quorum and workload capacity. + +The table below shows this command's [required ACLs](/api-docs#authentication). Configuration of +[blocking queries](/api-docs/features/blocking) and [agent caching](/api-docs/features/caching) +are not supported from commands, but may be from the corresponding HTTP endpoint. + +| ACL Required | +| ---------------- | +| `operator:write` | + +Usage: `consul operator raft transfer-leader -id="server id"` + +- `-id` - Specifies the node ID of the raft peer to transfer leadership to. +If empty, leadership transfers to a random server agent. + +The return code indicates success or failure.