mirror of https://github.com/status-im/consul.git
Leadership transfer cmd (#14132)
* add leadership transfer command * add RPC call test (flaky) * add missing import * add changelog * add command registration * Apply suggestions from code review Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com> * add the possibility of providing an id to raft leadership transfer. Add few tests. * delete old file from cherry pick * rename changelog filename to PR # * rename changelog and fix import * fix failing test * check for OperatorWrite Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com> * rename from leader-transfer to transfer-leader * remove version check and add test for operator read * move struct to operator.go * first pass * add code for leader transfer in the grpc backend and tests * wire the http endpoint to the new grpc endpoint * remove the RPC endpoint * remove non needed struct * fix naming * add mog glue to API * fix comment * remove dead code * fix linter error * change package name for proto file * remove error wrapping * fix failing test * add command registration * add grpc service mock tests * fix receiver to be pointer * use defined values Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com> * reuse MockAclAuthorizer * add documentation * remove usage of external.TokenFromContext * fix failing tests * fix proto generation * Apply suggestions from code review Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * Apply suggestions from code review * add more context in doc for the reason * Apply suggestions from docs code review Co-authored-by: Jeff Boruszak <104028618+boruszak@users.noreply.github.com> * regenerate proto * fix linter errors Co-authored-by: github-team-consul-core <github-team-consul-core@hashicorp.com> Co-authored-by: Matt Keeler <mkeeler@users.noreply.github.com> Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> Co-authored-by: Jeff Boruszak <104028618+boruszak@users.noreply.github.com>
This commit is contained in:
parent
ef61bdf3c2
commit
225ae55e83
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:enhancement
|
||||||
|
raft: add an operator api endpoint and a command to initiate raft leadership transfer.
|
||||||
|
```
|
|
@ -0,0 +1,223 @@
|
||||||
|
package acl
|
||||||
|
|
||||||
|
import "github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
|
type MockAuthorizer struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Authorizer = (*MockAuthorizer)(nil)
|
||||||
|
|
||||||
|
// ACLRead checks for permission to list all the ACLs
|
||||||
|
func (m *MockAuthorizer) ACLRead(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ACLWrite checks for permission to manipulate ACLs
|
||||||
|
func (m *MockAuthorizer) ACLWrite(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentRead checks for permission to read from agent endpoints for a
|
||||||
|
// given node.
|
||||||
|
func (m *MockAuthorizer) AgentRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentWrite checks for permission to make changes via agent endpoints
|
||||||
|
// for a given node.
|
||||||
|
func (m *MockAuthorizer) AgentWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventRead determines if a specific event can be queried.
|
||||||
|
func (m *MockAuthorizer) EventRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventWrite determines if a specific event may be fired.
|
||||||
|
func (m *MockAuthorizer) EventWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IntentionDefaultAllow determines the default authorized behavior
|
||||||
|
// when no intentions match a Connect request.
|
||||||
|
func (m *MockAuthorizer) IntentionDefaultAllow(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IntentionRead determines if a specific intention can be read.
|
||||||
|
func (m *MockAuthorizer) IntentionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IntentionWrite determines if a specific intention can be
|
||||||
|
// created, modified, or deleted.
|
||||||
|
func (m *MockAuthorizer) IntentionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyList checks for permission to list keys under a prefix
|
||||||
|
func (m *MockAuthorizer) KeyList(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyRead checks for permission to read a given key
|
||||||
|
func (m *MockAuthorizer) KeyRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyWrite checks for permission to write a given key
|
||||||
|
func (m *MockAuthorizer) KeyWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyWritePrefix checks for permission to write to an
|
||||||
|
// entire key prefix. This means there must be no sub-policies
|
||||||
|
// that deny a write.
|
||||||
|
func (m *MockAuthorizer) KeyWritePrefix(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyringRead determines if the encryption keyring used in
|
||||||
|
// the gossip layer can be read.
|
||||||
|
func (m *MockAuthorizer) KeyringRead(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyringWrite determines if the keyring can be manipulated
|
||||||
|
func (m *MockAuthorizer) KeyringWrite(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeRead checks for permission to read (discover) a given node.
|
||||||
|
func (m *MockAuthorizer) NodeRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAuthorizer) NodeReadAll(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeWrite checks for permission to create or update (register) a
|
||||||
|
// given node.
|
||||||
|
func (m *MockAuthorizer) NodeWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAuthorizer) MeshRead(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAuthorizer) MeshWrite(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeeringRead determines if the read-only Consul peering functions
|
||||||
|
// can be used.
|
||||||
|
func (m *MockAuthorizer) PeeringRead(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeeringWrite determines if the state-changing Consul peering
|
||||||
|
// functions can be used.
|
||||||
|
func (m *MockAuthorizer) PeeringWrite(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperatorRead determines if the read-only Consul operator functions
|
||||||
|
// can be used. ret := m.Called(segment, ctx)
|
||||||
|
func (m *MockAuthorizer) OperatorRead(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperatorWrite determines if the state-changing Consul operator
|
||||||
|
// functions can be used.
|
||||||
|
func (m *MockAuthorizer) OperatorWrite(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreparedQueryRead determines if a specific prepared query can be read
|
||||||
|
// to show its contents (this is not used for execution).
|
||||||
|
func (m *MockAuthorizer) PreparedQueryRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PreparedQueryWrite determines if a specific prepared query can be
|
||||||
|
// created, modified, or deleted.
|
||||||
|
func (m *MockAuthorizer) PreparedQueryWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceRead checks for permission to read a given service
|
||||||
|
func (m *MockAuthorizer) ServiceRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAuthorizer) ServiceReadAll(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceWrite checks for permission to create or update a given
|
||||||
|
// service
|
||||||
|
func (m *MockAuthorizer) ServiceWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServiceWriteAny checks for service:write on any service
|
||||||
|
func (m *MockAuthorizer) ServiceWriteAny(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionRead checks for permission to read sessions for a given node.
|
||||||
|
func (m *MockAuthorizer) SessionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionWrite checks for permission to create sessions for a given
|
||||||
|
// node.
|
||||||
|
func (m *MockAuthorizer) SessionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(segment, ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot checks for permission to take and restore snapshots.
|
||||||
|
func (m *MockAuthorizer) Snapshot(ctx *AuthorizerContext) EnforcementDecision {
|
||||||
|
ret := m.Called(ctx)
|
||||||
|
return ret.Get(0).(EnforcementDecision)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *MockAuthorizer) ToAllowAuthorizer() AllowAuthorizer {
|
||||||
|
return AllowAuthorizer{Authorizer: p}
|
||||||
|
}
|
|
@ -4,230 +4,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockAuthorizer struct {
|
|
||||||
mock.Mock
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ Authorizer = (*mockAuthorizer)(nil)
|
|
||||||
|
|
||||||
// ACLRead checks for permission to list all the ACLs
|
|
||||||
func (m *mockAuthorizer) ACLRead(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ACLWrite checks for permission to manipulate ACLs
|
|
||||||
func (m *mockAuthorizer) ACLWrite(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AgentRead checks for permission to read from agent endpoints for a
|
|
||||||
// given node.
|
|
||||||
func (m *mockAuthorizer) AgentRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AgentWrite checks for permission to make changes via agent endpoints
|
|
||||||
// for a given node.
|
|
||||||
func (m *mockAuthorizer) AgentWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventRead determines if a specific event can be queried.
|
|
||||||
func (m *mockAuthorizer) EventRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventWrite determines if a specific event may be fired.
|
|
||||||
func (m *mockAuthorizer) EventWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IntentionDefaultAllow determines the default authorized behavior
|
|
||||||
// when no intentions match a Connect request.
|
|
||||||
func (m *mockAuthorizer) IntentionDefaultAllow(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IntentionRead determines if a specific intention can be read.
|
|
||||||
func (m *mockAuthorizer) IntentionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IntentionWrite determines if a specific intention can be
|
|
||||||
// created, modified, or deleted.
|
|
||||||
func (m *mockAuthorizer) IntentionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyList checks for permission to list keys under a prefix
|
|
||||||
func (m *mockAuthorizer) KeyList(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyRead checks for permission to read a given key
|
|
||||||
func (m *mockAuthorizer) KeyRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyWrite checks for permission to write a given key
|
|
||||||
func (m *mockAuthorizer) KeyWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyWritePrefix checks for permission to write to an
|
|
||||||
// entire key prefix. This means there must be no sub-policies
|
|
||||||
// that deny a write.
|
|
||||||
func (m *mockAuthorizer) KeyWritePrefix(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyringRead determines if the encryption keyring used in
|
|
||||||
// the gossip layer can be read.
|
|
||||||
func (m *mockAuthorizer) KeyringRead(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyringWrite determines if the keyring can be manipulated
|
|
||||||
func (m *mockAuthorizer) KeyringWrite(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NodeRead checks for permission to read (discover) a given node.
|
|
||||||
func (m *mockAuthorizer) NodeRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAuthorizer) NodeReadAll(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NodeWrite checks for permission to create or update (register) a
|
|
||||||
// given node.
|
|
||||||
func (m *mockAuthorizer) NodeWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAuthorizer) MeshRead(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAuthorizer) MeshWrite(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PeeringRead determines if the read-only Consul peering functions
|
|
||||||
// can be used.
|
|
||||||
func (m *mockAuthorizer) PeeringRead(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PeeringWrite determines if the state-changing Consul peering
|
|
||||||
// functions can be used.
|
|
||||||
func (m *mockAuthorizer) PeeringWrite(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OperatorRead determines if the read-only Consul operator functions
|
|
||||||
// can be used. ret := m.Called(segment, ctx)
|
|
||||||
func (m *mockAuthorizer) OperatorRead(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OperatorWrite determines if the state-changing Consul operator
|
|
||||||
// functions can be used.
|
|
||||||
func (m *mockAuthorizer) OperatorWrite(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PreparedQueryRead determines if a specific prepared query can be read
|
|
||||||
// to show its contents (this is not used for execution).
|
|
||||||
func (m *mockAuthorizer) PreparedQueryRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PreparedQueryWrite determines if a specific prepared query can be
|
|
||||||
// created, modified, or deleted.
|
|
||||||
func (m *mockAuthorizer) PreparedQueryWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServiceRead checks for permission to read a given service
|
|
||||||
func (m *mockAuthorizer) ServiceRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockAuthorizer) ServiceReadAll(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServiceWrite checks for permission to create or update a given
|
|
||||||
// service
|
|
||||||
func (m *mockAuthorizer) ServiceWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServiceWriteAny checks for service:write on any service
|
|
||||||
func (m *mockAuthorizer) ServiceWriteAny(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SessionRead checks for permission to read sessions for a given node.
|
|
||||||
func (m *mockAuthorizer) SessionRead(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SessionWrite checks for permission to create sessions for a given
|
|
||||||
// node.
|
|
||||||
func (m *mockAuthorizer) SessionWrite(segment string, ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(segment, ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot checks for permission to take and restore snapshots.
|
|
||||||
func (m *mockAuthorizer) Snapshot(ctx *AuthorizerContext) EnforcementDecision {
|
|
||||||
ret := m.Called(ctx)
|
|
||||||
return ret.Get(0).(EnforcementDecision)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mockAuthorizer) ToAllowAuthorizer() AllowAuthorizer {
|
|
||||||
return AllowAuthorizer{Authorizer: p}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestACL_Enforce(t *testing.T) {
|
func TestACL_Enforce(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
method string
|
method string
|
||||||
|
@ -664,7 +443,7 @@ func TestACL_Enforce(t *testing.T) {
|
||||||
|
|
||||||
for _, tcase := range cases {
|
for _, tcase := range cases {
|
||||||
t.Run(testName(tcase), func(t *testing.T) {
|
t.Run(testName(tcase), func(t *testing.T) {
|
||||||
m := &mockAuthorizer{}
|
m := &MockAuthorizer{}
|
||||||
|
|
||||||
if tcase.err == "" {
|
if tcase.err == "" {
|
||||||
var nilCtx *AuthorizerContext
|
var nilCtx *AuthorizerContext
|
||||||
|
|
|
@ -16,7 +16,7 @@ func TestPermissionDeniedError(t *testing.T) {
|
||||||
return t.expected
|
return t.expected
|
||||||
}
|
}
|
||||||
|
|
||||||
auth1 := mockAuthorizer{}
|
auth1 := MockAuthorizer{}
|
||||||
|
|
||||||
cases := []testCase{
|
cases := []testCase{
|
||||||
{
|
{
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/hashicorp/consul/proto/pboperator"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -382,6 +383,8 @@ type Agent struct {
|
||||||
|
|
||||||
rpcClientPeering pbpeering.PeeringServiceClient
|
rpcClientPeering pbpeering.PeeringServiceClient
|
||||||
|
|
||||||
|
rpcClientOperator pboperator.OperatorServiceClient
|
||||||
|
|
||||||
// routineManager is responsible for managing longer running go routines
|
// routineManager is responsible for managing longer running go routines
|
||||||
// run by the Agent
|
// run by the Agent
|
||||||
routineManager *routine.Manager
|
routineManager *routine.Manager
|
||||||
|
@ -467,6 +470,7 @@ func New(bd BaseDeps) (*Agent, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
a.rpcClientPeering = pbpeering.NewPeeringServiceClient(conn)
|
a.rpcClientPeering = pbpeering.NewPeeringServiceClient(conn)
|
||||||
|
a.rpcClientOperator = pboperator.NewOperatorServiceClient(conn)
|
||||||
|
|
||||||
a.serviceManager = NewServiceManager(&a)
|
a.serviceManager = NewServiceManager(&a)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/acl/resolver"
|
||||||
|
"github.com/hashicorp/consul/agent/rpc/operator"
|
||||||
|
"github.com/hashicorp/consul/proto/pboperator"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
)
|
||||||
|
|
||||||
|
type OperatorBackend struct {
|
||||||
|
srv *Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewOperatorBackend returns a operator.Backend implementation that is bound to the given server.
|
||||||
|
func NewOperatorBackend(srv *Server) *OperatorBackend {
|
||||||
|
return &OperatorBackend{
|
||||||
|
srv: srv,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (op *OperatorBackend) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) {
|
||||||
|
return op.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (op *OperatorBackend) TransferLeader(_ context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) {
|
||||||
|
reply := new(pboperator.TransferLeaderResponse)
|
||||||
|
err := op.srv.attemptLeadershipTransfer(raft.ServerID(request.ID))
|
||||||
|
reply.Success = err == nil
|
||||||
|
return reply, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ operator.Backend = (*OperatorBackend)(nil)
|
|
@ -0,0 +1,193 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
external "github.com/hashicorp/consul/agent/grpc-external"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pboperator"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
gogrpc "google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOperatorBackend_TransferLeader(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
conf := testClusterConfig{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Servers: 3,
|
||||||
|
ServerConf: func(config *Config) {
|
||||||
|
config.RaftConfig.HeartbeatTimeout = 2 * time.Second
|
||||||
|
config.RaftConfig.ElectionTimeout = 2 * time.Second
|
||||||
|
config.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := newTestCluster(t, &conf)
|
||||||
|
s1 := nodes.Servers[0]
|
||||||
|
// Make sure a leader is elected
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Make a write call to server2 and make sure it gets forwarded to server1
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
// Dial server2 directly
|
||||||
|
conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(),
|
||||||
|
gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
|
||||||
|
gogrpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
gogrpc.WithBlock())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { conn.Close() })
|
||||||
|
|
||||||
|
operatorClient := pboperator.NewOperatorServiceClient(conn)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "transfer leader", func(t *testing.T) {
|
||||||
|
beforeLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, beforeLeader)
|
||||||
|
// Do the grpc Write call to server2
|
||||||
|
req := pboperator.TransferLeaderRequest{
|
||||||
|
ID: "",
|
||||||
|
}
|
||||||
|
reply, err := operatorClient.TransferLeader(ctx, &req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, reply.Success)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(r, afterLeader)
|
||||||
|
})
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, afterLeader)
|
||||||
|
if afterLeader == beforeLeader {
|
||||||
|
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOperatorBackend_TransferLeaderWithACL(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
conf := testClusterConfig{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Servers: 3,
|
||||||
|
ServerConf: func(config *Config) {
|
||||||
|
config.RaftConfig.HeartbeatTimeout = 2 * time.Second
|
||||||
|
config.RaftConfig.ElectionTimeout = 2 * time.Second
|
||||||
|
config.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
|
||||||
|
config.ACLsEnabled = true
|
||||||
|
config.ACLInitialManagementToken = "root"
|
||||||
|
config.ACLResolverSettings.ACLDefaultPolicy = "deny"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := newTestCluster(t, &conf)
|
||||||
|
s1 := nodes.Servers[0]
|
||||||
|
// Make sure a leader is elected
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Make a write call to server2 and make sure it gets forwarded to server1
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
// Dial server2 directly
|
||||||
|
conn, err := gogrpc.DialContext(ctx, s1.config.RPCAddr.String(),
|
||||||
|
gogrpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
|
||||||
|
gogrpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
gogrpc.WithBlock())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { conn.Close() })
|
||||||
|
|
||||||
|
operatorClient := pboperator.NewOperatorServiceClient(conn)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "transfer leader no token", func(t *testing.T) {
|
||||||
|
beforeLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, beforeLeader)
|
||||||
|
// Do the grpc Write call to server2
|
||||||
|
req := pboperator.TransferLeaderRequest{
|
||||||
|
ID: "",
|
||||||
|
}
|
||||||
|
reply, err := operatorClient.TransferLeader(ctx, &req)
|
||||||
|
require.True(t, acl.IsErrPermissionDenied(err))
|
||||||
|
require.Nil(t, reply)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(r, afterLeader)
|
||||||
|
})
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, afterLeader)
|
||||||
|
if afterLeader != beforeLeader {
|
||||||
|
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "transfer leader operator read token", func(t *testing.T) {
|
||||||
|
|
||||||
|
beforeLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, beforeLeader)
|
||||||
|
// Do the grpc Write call to server2
|
||||||
|
req := pboperator.TransferLeaderRequest{
|
||||||
|
ID: "",
|
||||||
|
}
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
rules := `operator = "read"`
|
||||||
|
tokenRead := createToken(t, codec, rules)
|
||||||
|
|
||||||
|
ctxToken, err := external.ContextWithQueryOptions(ctx, structs.QueryOptions{Token: tokenRead})
|
||||||
|
require.NoError(t, err)
|
||||||
|
reply, err := operatorClient.TransferLeader(ctxToken, &req)
|
||||||
|
require.True(t, acl.IsErrPermissionDenied(err))
|
||||||
|
require.Nil(t, reply)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(r, afterLeader)
|
||||||
|
})
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, afterLeader)
|
||||||
|
if afterLeader != beforeLeader {
|
||||||
|
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "transfer leader operator write token", func(t *testing.T) {
|
||||||
|
|
||||||
|
beforeLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, beforeLeader)
|
||||||
|
// Do the grpc Write call to server2
|
||||||
|
req := pboperator.TransferLeaderRequest{
|
||||||
|
ID: "",
|
||||||
|
}
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
rules := `operator = "write"`
|
||||||
|
tokenWrite := createTokenWithPolicyNameFull(t, codec, "the-policy-write", rules, "root")
|
||||||
|
ctxToken, err := external.ContextWithQueryOptions(ctx, structs.QueryOptions{Token: tokenWrite.SecretID})
|
||||||
|
require.NoError(t, err)
|
||||||
|
reply, err := operatorClient.TransferLeader(ctxToken, &req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, reply.Success)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(r, afterLeader)
|
||||||
|
})
|
||||||
|
afterLeader, _ := s1.raft.LeaderWithID()
|
||||||
|
require.NotEmpty(t, afterLeader)
|
||||||
|
if afterLeader == beforeLeader {
|
||||||
|
t.Fatalf("leader should have changed %s == %s", afterLeader, beforeLeader)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/hashicorp/go-connlimit"
|
"github.com/hashicorp/go-connlimit"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/go-version"
|
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
autopilot "github.com/hashicorp/raft-autopilot"
|
autopilot "github.com/hashicorp/raft-autopilot"
|
||||||
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
|
||||||
|
@ -51,6 +50,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
"github.com/hashicorp/consul/agent/router"
|
||||||
"github.com/hashicorp/consul/agent/rpc/middleware"
|
"github.com/hashicorp/consul/agent/rpc/middleware"
|
||||||
|
"github.com/hashicorp/consul/agent/rpc/operator"
|
||||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
|
@ -370,6 +370,9 @@ type Server struct {
|
||||||
// peeringBackend is shared between the external and internal gRPC services for peering
|
// peeringBackend is shared between the external and internal gRPC services for peering
|
||||||
peeringBackend *PeeringBackend
|
peeringBackend *PeeringBackend
|
||||||
|
|
||||||
|
// operatorBackend is shared between the external and internal gRPC services for peering
|
||||||
|
operatorBackend *OperatorBackend
|
||||||
|
|
||||||
// peerStreamServer is a server used to handle peering streams from external clusters.
|
// peerStreamServer is a server used to handle peering streams from external clusters.
|
||||||
peerStreamServer *peerstream.Server
|
peerStreamServer *peerstream.Server
|
||||||
|
|
||||||
|
@ -385,6 +388,7 @@ type Server struct {
|
||||||
|
|
||||||
// embedded struct to hold all the enterprise specific data
|
// embedded struct to hold all the enterprise specific data
|
||||||
EnterpriseServer
|
EnterpriseServer
|
||||||
|
operatorServer *operator.Server
|
||||||
}
|
}
|
||||||
type connHandler interface {
|
type connHandler interface {
|
||||||
Run() error
|
Run() error
|
||||||
|
@ -739,6 +743,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
|
||||||
}).Register(s.externalGRPCServer)
|
}).Register(s.externalGRPCServer)
|
||||||
|
|
||||||
s.peeringBackend = NewPeeringBackend(s)
|
s.peeringBackend = NewPeeringBackend(s)
|
||||||
|
s.operatorBackend = NewOperatorBackend(s)
|
||||||
s.peerStreamServer = peerstream.NewServer(peerstream.Config{
|
s.peerStreamServer = peerstream.NewServer(peerstream.Config{
|
||||||
Backend: s.peeringBackend,
|
Backend: s.peeringBackend,
|
||||||
GetStore: func() peerstream.StateStore { return s.FSM().State() },
|
GetStore: func() peerstream.StateStore { return s.FSM().State() },
|
||||||
|
@ -826,6 +831,19 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
|
||||||
PeeringEnabled: config.PeeringEnabled,
|
PeeringEnabled: config.PeeringEnabled,
|
||||||
})
|
})
|
||||||
s.peeringServer = p
|
s.peeringServer = p
|
||||||
|
o := operator.NewServer(operator.Config{
|
||||||
|
Backend: s.operatorBackend,
|
||||||
|
Logger: deps.Logger.Named("grpc-api.operator"),
|
||||||
|
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||||
|
// Only forward the request if the dc in the request matches the server's datacenter.
|
||||||
|
if info.RequestDatacenter() != "" && info.RequestDatacenter() != config.Datacenter {
|
||||||
|
return false, fmt.Errorf("requests to transfer leader cannot be forwarded to remote datacenters")
|
||||||
|
}
|
||||||
|
return s.ForwardGRPC(s.grpcConnPool, info, fn)
|
||||||
|
},
|
||||||
|
Datacenter: config.Datacenter,
|
||||||
|
})
|
||||||
|
s.operatorServer = o
|
||||||
|
|
||||||
register := func(srv *grpc.Server) {
|
register := func(srv *grpc.Server) {
|
||||||
if config.RPCConfig.EnableStreaming {
|
if config.RPCConfig.EnableStreaming {
|
||||||
|
@ -834,6 +852,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
|
||||||
deps.Logger.Named("grpc-api.subscription")))
|
deps.Logger.Named("grpc-api.subscription")))
|
||||||
}
|
}
|
||||||
s.peeringServer.Register(srv)
|
s.peeringServer.Register(srv)
|
||||||
|
s.operatorServer.Register(srv)
|
||||||
s.registerEnterpriseGRPCServices(deps, srv)
|
s.registerEnterpriseGRPCServices(deps, srv)
|
||||||
|
|
||||||
// Note: these external gRPC services are also exposed on the internal server to
|
// Note: these external gRPC services are also exposed on the internal server to
|
||||||
|
@ -1193,20 +1212,25 @@ func (s *Server) Shutdown() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) attemptLeadershipTransfer() (success bool) {
|
func (s *Server) attemptLeadershipTransfer(id raft.ServerID) (err error) {
|
||||||
leadershipTransferVersion := version.Must(version.NewVersion(LeaderTransferMinVersion))
|
var addr raft.ServerAddress
|
||||||
|
if id != "" {
|
||||||
ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, leadershipTransferVersion)
|
addr, err = s.serverLookup.ServerAddr(id)
|
||||||
if !ok {
|
if err != nil {
|
||||||
return false
|
return err
|
||||||
|
}
|
||||||
|
future := s.raft.LeadershipTransferToServer(id, addr)
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
future := s.raft.LeadershipTransfer()
|
||||||
|
if err := future.Error(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
future := s.raft.LeadershipTransfer()
|
return nil
|
||||||
if err := future.Error(); err != nil {
|
|
||||||
s.logger.Error("failed to transfer leadership, removing the server", "error", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Leave is used to prepare for a graceful shutdown.
|
// Leave is used to prepare for a graceful shutdown.
|
||||||
|
@ -1228,7 +1252,7 @@ func (s *Server) Leave() error {
|
||||||
// removed for some reasonable period of time.
|
// removed for some reasonable period of time.
|
||||||
isLeader := s.IsLeader()
|
isLeader := s.IsLeader()
|
||||||
if isLeader && numPeers > 1 {
|
if isLeader && numPeers > 1 {
|
||||||
if s.attemptLeadershipTransfer() {
|
if err := s.attemptLeadershipTransfer(""); err == nil {
|
||||||
isLeader = false
|
isLeader = false
|
||||||
} else {
|
} else {
|
||||||
future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0)
|
future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0)
|
||||||
|
|
|
@ -99,6 +99,7 @@ func init() {
|
||||||
registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPHandlers).ACLAuthorize)
|
registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPHandlers).ACLAuthorize)
|
||||||
registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPHandlers).KVSEndpoint)
|
registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPHandlers).KVSEndpoint)
|
||||||
registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPHandlers).OperatorRaftConfiguration)
|
registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPHandlers).OperatorRaftConfiguration)
|
||||||
|
registerEndpoint("/v1/operator/raft/transfer-leader", []string{"POST"}, (*HTTPHandlers).OperatorRaftTransferLeader)
|
||||||
registerEndpoint("/v1/operator/raft/peer", []string{"DELETE"}, (*HTTPHandlers).OperatorRaftPeer)
|
registerEndpoint("/v1/operator/raft/peer", []string{"DELETE"}, (*HTTPHandlers).OperatorRaftPeer)
|
||||||
registerEndpoint("/v1/operator/keyring", []string{"GET", "POST", "PUT", "DELETE"}, (*HTTPHandlers).OperatorKeyringEndpoint)
|
registerEndpoint("/v1/operator/keyring", []string{"GET", "POST", "PUT", "DELETE"}, (*HTTPHandlers).OperatorKeyringEndpoint)
|
||||||
registerEndpoint("/v1/operator/autopilot/configuration", []string{"GET", "PUT"}, (*HTTPHandlers).OperatorAutopilotConfiguration)
|
registerEndpoint("/v1/operator/autopilot/configuration", []string{"GET", "PUT"}, (*HTTPHandlers).OperatorAutopilotConfiguration)
|
||||||
|
|
|
@ -2,6 +2,8 @@ package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
external "github.com/hashicorp/consul/agent/grpc-external"
|
||||||
|
"github.com/hashicorp/consul/proto/pboperator"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
@ -31,6 +33,43 @@ func (s *HTTPHandlers) OperatorRaftConfiguration(resp http.ResponseWriter, req *
|
||||||
return reply, nil
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OperatorRaftTransferLeader is used to transfer raft cluster leadership to another node
|
||||||
|
func (s *HTTPHandlers) OperatorRaftTransferLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
|
|
||||||
|
var entMeta acl.EnterpriseMeta
|
||||||
|
if err := s.parseEntMetaPartition(req, &entMeta); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
_, hasID := params["id"]
|
||||||
|
ID := ""
|
||||||
|
if hasID {
|
||||||
|
ID = params.Get("id")
|
||||||
|
}
|
||||||
|
args := pboperator.TransferLeaderRequest{
|
||||||
|
ID: ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
var token string
|
||||||
|
s.parseToken(req, &token)
|
||||||
|
ctx, err := external.ContextWithQueryOptions(req.Context(), structs.QueryOptions{Token: token})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := s.agent.rpcClientOperator.TransferLeader(ctx, &args)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if result.Success != true {
|
||||||
|
return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("Failed to transfer Leader: %s", err.Error())}
|
||||||
|
}
|
||||||
|
reply := new(api.TransferLeaderResponse)
|
||||||
|
pboperator.TransferLeaderResponseToAPI(result, reply)
|
||||||
|
return reply, nil
|
||||||
|
}
|
||||||
|
|
||||||
// OperatorRaftPeer supports actions on Raft peers. Currently we only support
|
// OperatorRaftPeer supports actions on Raft peers. Currently we only support
|
||||||
// removing peers by address.
|
// removing peers by address.
|
||||||
func (s *HTTPHandlers) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPHandlers) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
package operator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/acl/resolver"
|
||||||
|
external "github.com/hashicorp/consul/agent/grpc-external"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pboperator"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// For private/internal gRPC handlers, protoc-gen-rpc-glue generates the
|
||||||
|
// requisite methods to satisfy the structs.RPCInfo interface using fields
|
||||||
|
// from the pbcommon package. This service is public, so we can't use those
|
||||||
|
// fields in our proto definition. Instead, we construct our RPCInfo manually.
|
||||||
|
var writeRequest struct {
|
||||||
|
structs.WriteRequest
|
||||||
|
structs.DCSpecificRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
var readRequest struct {
|
||||||
|
structs.QueryOptions
|
||||||
|
structs.DCSpecificRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server implements pboperator.OperatorService to provide RPC operations for
|
||||||
|
// managing operator operation.
|
||||||
|
type Server struct {
|
||||||
|
Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) {
|
||||||
|
resp := &pboperator.TransferLeaderResponse{Success: false}
|
||||||
|
handled, err := s.ForwardRPC(&writeRequest, func(conn *grpc.ClientConn) error {
|
||||||
|
ctx := external.ForwardMetadataContext(ctx)
|
||||||
|
var err error
|
||||||
|
resp, err = pboperator.NewOperatorServiceClient(conn).TransferLeader(ctx, request)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if handled || err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var authzCtx acl.AuthorizerContext
|
||||||
|
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||||
|
|
||||||
|
options, err := external.QueryOptionsFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
||||||
|
if err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := authz.ToAllowAuthorizer().OperatorWriteAllowed(&authzCtx); err != nil {
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.Backend.TransferLeader(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Backend Backend
|
||||||
|
Logger hclog.Logger
|
||||||
|
ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
|
||||||
|
Datacenter string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServer(cfg Config) *Server {
|
||||||
|
requireNotNil(cfg.Backend, "Backend")
|
||||||
|
requireNotNil(cfg.Logger, "Logger")
|
||||||
|
requireNotNil(cfg.ForwardRPC, "ForwardRPC")
|
||||||
|
if cfg.Datacenter == "" {
|
||||||
|
panic("Datacenter is required")
|
||||||
|
}
|
||||||
|
return &Server{
|
||||||
|
Config: cfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireNotNil(v interface{}, name string) {
|
||||||
|
if v == nil {
|
||||||
|
panic(name + " is required")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ pboperator.OperatorServiceServer = (*Server)(nil)
|
||||||
|
|
||||||
|
func (s *Server) Register(grpcServer *grpc.Server) {
|
||||||
|
pboperator.RegisterOperatorServiceServer(grpcServer, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backend defines the core integrations the Operator endpoint depends on. A
|
||||||
|
// functional implementation will integrate with various operator operation such as
|
||||||
|
// raft, autopilot operation. The only currently implemented operation is raft leader transfer
|
||||||
|
type Backend interface {
|
||||||
|
TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error)
|
||||||
|
ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error)
|
||||||
|
}
|
|
@ -0,0 +1,104 @@
|
||||||
|
package operator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/acl/resolver"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pboperator"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MockBackend struct {
|
||||||
|
mock.Mock
|
||||||
|
authorizer acl.Authorizer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockBackend) TransferLeader(ctx context.Context, request *pboperator.TransferLeaderRequest) (*pboperator.TransferLeaderResponse, error) {
|
||||||
|
called := m.Called(ctx, request)
|
||||||
|
ret := called.Get(0)
|
||||||
|
if ret == nil {
|
||||||
|
return nil, called.Error(1)
|
||||||
|
}
|
||||||
|
return ret.(*pboperator.TransferLeaderResponse), called.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockBackend) ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error) {
|
||||||
|
return resolver.Result{Authorizer: m.authorizer}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeaderTransfer_ACL_Deny(t *testing.T) {
|
||||||
|
authorizer := acl.MockAuthorizer{}
|
||||||
|
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Deny)
|
||||||
|
server := NewServer(Config{Datacenter: "dc1", Backend: &MockBackend{authorizer: &authorizer}, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
|
||||||
|
|
||||||
|
_, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, "Permission denied: provided token lacks permission 'operator:write'", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeaderTransfer_ACL_Allowed(t *testing.T) {
|
||||||
|
authorizer := &acl.MockAuthorizer{}
|
||||||
|
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
|
||||||
|
|
||||||
|
backend := &MockBackend{authorizer: authorizer}
|
||||||
|
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(nil, nil)
|
||||||
|
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
|
||||||
|
|
||||||
|
_, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeaderTransfer_LeaderTransfer_Fail(t *testing.T) {
|
||||||
|
authorizer := &acl.MockAuthorizer{}
|
||||||
|
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
|
||||||
|
|
||||||
|
backend := &MockBackend{authorizer: authorizer}
|
||||||
|
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("test"))
|
||||||
|
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
|
||||||
|
|
||||||
|
_, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, "test", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeaderTransfer_LeaderTransfer_Success(t *testing.T) {
|
||||||
|
authorizer := &acl.MockAuthorizer{}
|
||||||
|
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
|
||||||
|
|
||||||
|
backend := &MockBackend{authorizer: authorizer}
|
||||||
|
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(&pboperator.TransferLeaderResponse{Success: true}, nil)
|
||||||
|
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: doForwardRPC})
|
||||||
|
|
||||||
|
ret, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, ret)
|
||||||
|
require.True(t, ret.Success)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLeaderTransfer_LeaderTransfer_ForwardRPC(t *testing.T) {
|
||||||
|
authorizer := &acl.MockAuthorizer{}
|
||||||
|
authorizer.On("OperatorWrite", mock.Anything).Return(acl.Allow)
|
||||||
|
|
||||||
|
backend := &MockBackend{authorizer: authorizer}
|
||||||
|
backend.On("TransferLeader", mock.Anything, mock.Anything).Return(&pboperator.TransferLeaderResponse{}, nil)
|
||||||
|
server := NewServer(Config{Datacenter: "dc1", Backend: backend, Logger: hclog.New(nil), ForwardRPC: noopForwardRPC})
|
||||||
|
|
||||||
|
ret, err := server.TransferLeader(context.Background(), &pboperator.TransferLeaderRequest{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, ret)
|
||||||
|
require.False(t, ret.Success)
|
||||||
|
}
|
||||||
|
func noopForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func doForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
|
@ -36,6 +36,11 @@ type RaftConfiguration struct {
|
||||||
Index uint64
|
Index uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TransferLeaderResponse is returned when querying for the current Raft configuration.
|
||||||
|
type TransferLeaderResponse struct {
|
||||||
|
Success bool
|
||||||
|
}
|
||||||
|
|
||||||
// RaftGetConfiguration is used to query the current Raft peer set.
|
// RaftGetConfiguration is used to query the current Raft peer set.
|
||||||
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
|
||||||
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
|
||||||
|
@ -56,6 +61,26 @@ func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, e
|
||||||
return &out, nil
|
return &out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RaftLeaderTransfer is used to transfer the current raft leader to another node
|
||||||
|
func (op *Operator) RaftLeaderTransfer(q *QueryOptions) (*TransferLeaderResponse, error) {
|
||||||
|
r := op.c.newRequest("POST", "/v1/operator/raft/transfer-leader")
|
||||||
|
r.setQueryOptions(q)
|
||||||
|
_, resp, err := op.c.doRequest(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer closeResponseBody(resp)
|
||||||
|
if err := requireOK(resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var out TransferLeaderResponse
|
||||||
|
if err := decodeBody(resp, &out); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
|
||||||
// quorum but no longer known to Serf or the catalog) by address in the form of
|
// quorum but no longer known to Serf or the catalog) by address in the form of
|
||||||
// "IP:port".
|
// "IP:port".
|
||||||
|
|
|
@ -36,3 +36,21 @@ func TestAPI_OperatorRaftRemovePeerByAddress(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAPI_OperatorRaftLeaderTransfer(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
c, s := makeClient(t)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
// If we get this error, it proves we sent the address all the way
|
||||||
|
// through.
|
||||||
|
operator := c.Operator()
|
||||||
|
transfer, err := operator.RaftLeaderTransfer(nil)
|
||||||
|
if err == nil || !strings.Contains(err.Error(),
|
||||||
|
"cannot find peer") {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if transfer != nil {
|
||||||
|
t.Fatalf("err:%v", transfer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
package transferleader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/command/flags"
|
||||||
|
"github.com/mitchellh/cli"
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(ui cli.Ui) *cmd {
|
||||||
|
c := &cmd{UI: ui}
|
||||||
|
c.init()
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
type cmd struct {
|
||||||
|
UI cli.Ui
|
||||||
|
flags *flag.FlagSet
|
||||||
|
http *flags.HTTPFlags
|
||||||
|
help string
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmd) init() {
|
||||||
|
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
|
||||||
|
c.http = &flags.HTTPFlags{}
|
||||||
|
c.flags.StringVar(&c.id, "id", "",
|
||||||
|
"The ID to remove from the Raft configuration.")
|
||||||
|
flags.Merge(c.flags, c.http.ClientFlags())
|
||||||
|
flags.Merge(c.flags, c.http.ServerFlags())
|
||||||
|
c.help = flags.Usage(help, c.flags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmd) Run(args []string) int {
|
||||||
|
if err := c.flags.Parse(args); err != nil {
|
||||||
|
if err == flag.ErrHelp {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
c.UI.Error(fmt.Sprintf("Failed to parse args: %v", err))
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up a client.
|
||||||
|
client, err := c.http.APIClient()
|
||||||
|
if err != nil {
|
||||||
|
c.UI.Error(fmt.Sprintf("Error initializing client: %s", err))
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch the current configuration.
|
||||||
|
result, err := raftTransferLeader(client, c.http.Stale())
|
||||||
|
if err != nil {
|
||||||
|
c.UI.Error(fmt.Sprintf("Error transfering leadership: %v", err))
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
c.UI.Output(result)
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func raftTransferLeader(client *api.Client, stale bool) (string, error) {
|
||||||
|
q := &api.QueryOptions{
|
||||||
|
AllowStale: stale,
|
||||||
|
}
|
||||||
|
reply, err := client.Operator().RaftLeaderTransfer(q)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("Failed to transfer leadership %w", err)
|
||||||
|
}
|
||||||
|
if !reply.Success {
|
||||||
|
return "", fmt.Errorf("Failed to transfer leadership")
|
||||||
|
}
|
||||||
|
return "Success", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmd) Synopsis() string {
|
||||||
|
return synopsis
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cmd) Help() string {
|
||||||
|
return c.help
|
||||||
|
}
|
||||||
|
|
||||||
|
const synopsis = "Transfer raft leadership to another node"
|
||||||
|
const help = `
|
||||||
|
Usage: consul operator raft transfer-leader [options]
|
||||||
|
|
||||||
|
Transfer raft leadership to another node.
|
||||||
|
`
|
|
@ -0,0 +1,43 @@
|
||||||
|
package transferleader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/agent"
|
||||||
|
"github.com/mitchellh/cli"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestOperatorRaftTransferLeaderCommand_noTabs(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
if strings.ContainsRune(New(cli.NewMockUi()).Help(), '\t') {
|
||||||
|
t.Fatal("help has tabs")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This only test that the command behave correctly when only one agent is present
|
||||||
|
// and no leadership transfer is possible, testing for the functionality will be done at the RPC level.
|
||||||
|
func TestOperatorRaftTransferLeaderWithSingleNode(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
a := agent.NewTestAgent(t, ``)
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
expected := "cannot find peer"
|
||||||
|
|
||||||
|
// Test the transfer-leader subcommand directly
|
||||||
|
ui := cli.NewMockUi()
|
||||||
|
c := New(ui)
|
||||||
|
|
||||||
|
args := []string{"-http-addr=" + a.HTTPAddr()}
|
||||||
|
code := c.Run(args)
|
||||||
|
if code != 1 {
|
||||||
|
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||||
|
}
|
||||||
|
output := strings.TrimSpace(ui.ErrorWriter.String())
|
||||||
|
if !strings.Contains(output, expected) {
|
||||||
|
t.Fatalf("bad: %q, %q", output, expected)
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package command
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/hashicorp/consul/command/operator/raft/transferleader"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
@ -220,6 +221,7 @@ func RegisteredCommands(ui cli.Ui) map[string]mcli.CommandFactory {
|
||||||
entry{"operator raft", func(cli.Ui) (cli.Command, error) { return operraft.New(), nil }},
|
entry{"operator raft", func(cli.Ui) (cli.Command, error) { return operraft.New(), nil }},
|
||||||
entry{"operator raft list-peers", func(ui cli.Ui) (cli.Command, error) { return operraftlist.New(ui), nil }},
|
entry{"operator raft list-peers", func(ui cli.Ui) (cli.Command, error) { return operraftlist.New(ui), nil }},
|
||||||
entry{"operator raft remove-peer", func(ui cli.Ui) (cli.Command, error) { return operraftremove.New(ui), nil }},
|
entry{"operator raft remove-peer", func(ui cli.Ui) (cli.Command, error) { return operraftremove.New(ui), nil }},
|
||||||
|
entry{"operator raft transfer-leader", func(ui cli.Ui) (cli.Command, error) { return transferleader.New(ui), nil }},
|
||||||
entry{"peering", func(cli.Ui) (cli.Command, error) { return peering.New(), nil }},
|
entry{"peering", func(cli.Ui) (cli.Command, error) { return peering.New(), nil }},
|
||||||
entry{"peering delete", func(ui cli.Ui) (cli.Command, error) { return peerdelete.New(ui), nil }},
|
entry{"peering delete", func(ui cli.Ui) (cli.Command, error) { return peerdelete.New(ui), nil }},
|
||||||
entry{"peering generate-token", func(ui cli.Ui) (cli.Command, error) { return peergenerate.New(ui), nil }},
|
entry{"peering generate-token", func(ui cli.Ui) (cli.Command, error) { return peergenerate.New(ui), nil }},
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
// Code generated by mog. DO NOT EDIT.
|
||||||
|
|
||||||
|
package pboperator
|
||||||
|
|
||||||
|
import "github.com/hashicorp/consul/api"
|
||||||
|
|
||||||
|
func TransferLeaderResponseToAPI(s *TransferLeaderResponse, t *api.TransferLeaderResponse) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Success = s.Success
|
||||||
|
}
|
||||||
|
func TransferLeaderResponseFromAPI(t *api.TransferLeaderResponse, s *TransferLeaderResponse) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.Success = t.Success
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
|
||||||
|
// source: proto/pboperator/operator.proto
|
||||||
|
|
||||||
|
package pboperator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *TransferLeaderRequest) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *TransferLeaderRequest) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *TransferLeaderResponse) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *TransferLeaderResponse) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
|
@ -0,0 +1,242 @@
|
||||||
|
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// protoc-gen-go v1.26.0
|
||||||
|
// protoc (unknown)
|
||||||
|
// source: proto/pboperator/operator.proto
|
||||||
|
|
||||||
|
package pboperator
|
||||||
|
|
||||||
|
import (
|
||||||
|
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||||
|
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||||
|
reflect "reflect"
|
||||||
|
sync "sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Verify that this generated code is sufficiently up-to-date.
|
||||||
|
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
|
||||||
|
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||||
|
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
|
||||||
|
)
|
||||||
|
|
||||||
|
type TransferLeaderRequest struct {
|
||||||
|
state protoimpl.MessageState
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
|
||||||
|
ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *TransferLeaderRequest) Reset() {
|
||||||
|
*x = TransferLeaderRequest{}
|
||||||
|
if protoimpl.UnsafeEnabled {
|
||||||
|
mi := &file_proto_pboperator_operator_proto_msgTypes[0]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *TransferLeaderRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*TransferLeaderRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *TransferLeaderRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_proto_pboperator_operator_proto_msgTypes[0]
|
||||||
|
if protoimpl.UnsafeEnabled && x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use TransferLeaderRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*TransferLeaderRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_proto_pboperator_operator_proto_rawDescGZIP(), []int{0}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *TransferLeaderRequest) GetID() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.ID
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// mog annotation:
|
||||||
|
//
|
||||||
|
// target=github.com/hashicorp/consul/api.TransferLeaderResponse
|
||||||
|
// output=operator.gen.go
|
||||||
|
// name=API
|
||||||
|
type TransferLeaderResponse struct {
|
||||||
|
state protoimpl.MessageState
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
|
||||||
|
// true if the transfer is a success
|
||||||
|
Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *TransferLeaderResponse) Reset() {
|
||||||
|
*x = TransferLeaderResponse{}
|
||||||
|
if protoimpl.UnsafeEnabled {
|
||||||
|
mi := &file_proto_pboperator_operator_proto_msgTypes[1]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *TransferLeaderResponse) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*TransferLeaderResponse) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *TransferLeaderResponse) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_proto_pboperator_operator_proto_msgTypes[1]
|
||||||
|
if protoimpl.UnsafeEnabled && x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use TransferLeaderResponse.ProtoReflect.Descriptor instead.
|
||||||
|
func (*TransferLeaderResponse) Descriptor() ([]byte, []int) {
|
||||||
|
return file_proto_pboperator_operator_proto_rawDescGZIP(), []int{1}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *TransferLeaderResponse) GetSuccess() bool {
|
||||||
|
if x != nil {
|
||||||
|
return x.Success
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var File_proto_pboperator_operator_proto protoreflect.FileDescriptor
|
||||||
|
|
||||||
|
var file_proto_pboperator_operator_proto_rawDesc = []byte{
|
||||||
|
0x0a, 0x1f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74,
|
||||||
|
0x6f, 0x72, 0x2f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74,
|
||||||
|
0x6f, 0x12, 0x22, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e,
|
||||||
|
0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65,
|
||||||
|
0x72, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x27, 0x0a, 0x15, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65,
|
||||||
|
0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e,
|
||||||
|
0x0a, 0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x22, 0x32,
|
||||||
|
0x0a, 0x16, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72,
|
||||||
|
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63,
|
||||||
|
0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65,
|
||||||
|
0x73, 0x73, 0x32, 0x9d, 0x01, 0x0a, 0x0f, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x53,
|
||||||
|
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0e, 0x54, 0x72, 0x61, 0x6e, 0x73,
|
||||||
|
0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x39, 0x2e, 0x68, 0x61, 0x73, 0x68,
|
||||||
|
0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74,
|
||||||
|
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x54,
|
||||||
|
0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71,
|
||||||
|
0x75, 0x65, 0x73, 0x74, 0x1a, 0x3a, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
|
||||||
|
0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
|
||||||
|
0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66,
|
||||||
|
0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
|
||||||
|
0x22, 0x00, 0x42, 0x91, 0x02, 0x0a, 0x26, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69,
|
||||||
|
0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65,
|
||||||
|
0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x42, 0x0d, 0x4f,
|
||||||
|
0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2c,
|
||||||
|
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69,
|
||||||
|
0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
|
||||||
|
0x6f, 0x2f, 0x70, 0x62, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xa2, 0x02, 0x04, 0x48,
|
||||||
|
0x43, 0x49, 0x4f, 0xaa, 0x02, 0x22, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e,
|
||||||
|
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e,
|
||||||
|
0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xca, 0x02, 0x22, 0x48, 0x61, 0x73, 0x68, 0x69,
|
||||||
|
0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65,
|
||||||
|
0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0xe2, 0x02, 0x2e,
|
||||||
|
0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c,
|
||||||
|
0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74,
|
||||||
|
0x6f, 0x72, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02,
|
||||||
|
0x25, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73,
|
||||||
|
0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x3a, 0x3a, 0x4f, 0x70,
|
||||||
|
0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
file_proto_pboperator_operator_proto_rawDescOnce sync.Once
|
||||||
|
file_proto_pboperator_operator_proto_rawDescData = file_proto_pboperator_operator_proto_rawDesc
|
||||||
|
)
|
||||||
|
|
||||||
|
func file_proto_pboperator_operator_proto_rawDescGZIP() []byte {
|
||||||
|
file_proto_pboperator_operator_proto_rawDescOnce.Do(func() {
|
||||||
|
file_proto_pboperator_operator_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_pboperator_operator_proto_rawDescData)
|
||||||
|
})
|
||||||
|
return file_proto_pboperator_operator_proto_rawDescData
|
||||||
|
}
|
||||||
|
|
||||||
|
var file_proto_pboperator_operator_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
|
||||||
|
var file_proto_pboperator_operator_proto_goTypes = []interface{}{
|
||||||
|
(*TransferLeaderRequest)(nil), // 0: hashicorp.consul.internal.operator.TransferLeaderRequest
|
||||||
|
(*TransferLeaderResponse)(nil), // 1: hashicorp.consul.internal.operator.TransferLeaderResponse
|
||||||
|
}
|
||||||
|
var file_proto_pboperator_operator_proto_depIdxs = []int32{
|
||||||
|
0, // 0: hashicorp.consul.internal.operator.OperatorService.TransferLeader:input_type -> hashicorp.consul.internal.operator.TransferLeaderRequest
|
||||||
|
1, // 1: hashicorp.consul.internal.operator.OperatorService.TransferLeader:output_type -> hashicorp.consul.internal.operator.TransferLeaderResponse
|
||||||
|
1, // [1:2] is the sub-list for method output_type
|
||||||
|
0, // [0:1] is the sub-list for method input_type
|
||||||
|
0, // [0:0] is the sub-list for extension type_name
|
||||||
|
0, // [0:0] is the sub-list for extension extendee
|
||||||
|
0, // [0:0] is the sub-list for field type_name
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() { file_proto_pboperator_operator_proto_init() }
|
||||||
|
func file_proto_pboperator_operator_proto_init() {
|
||||||
|
if File_proto_pboperator_operator_proto != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !protoimpl.UnsafeEnabled {
|
||||||
|
file_proto_pboperator_operator_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||||
|
switch v := v.(*TransferLeaderRequest); i {
|
||||||
|
case 0:
|
||||||
|
return &v.state
|
||||||
|
case 1:
|
||||||
|
return &v.sizeCache
|
||||||
|
case 2:
|
||||||
|
return &v.unknownFields
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
file_proto_pboperator_operator_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
|
||||||
|
switch v := v.(*TransferLeaderResponse); i {
|
||||||
|
case 0:
|
||||||
|
return &v.state
|
||||||
|
case 1:
|
||||||
|
return &v.sizeCache
|
||||||
|
case 2:
|
||||||
|
return &v.unknownFields
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
type x struct{}
|
||||||
|
out := protoimpl.TypeBuilder{
|
||||||
|
File: protoimpl.DescBuilder{
|
||||||
|
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||||
|
RawDescriptor: file_proto_pboperator_operator_proto_rawDesc,
|
||||||
|
NumEnums: 0,
|
||||||
|
NumMessages: 2,
|
||||||
|
NumExtensions: 0,
|
||||||
|
NumServices: 1,
|
||||||
|
},
|
||||||
|
GoTypes: file_proto_pboperator_operator_proto_goTypes,
|
||||||
|
DependencyIndexes: file_proto_pboperator_operator_proto_depIdxs,
|
||||||
|
MessageInfos: file_proto_pboperator_operator_proto_msgTypes,
|
||||||
|
}.Build()
|
||||||
|
File_proto_pboperator_operator_proto = out.File
|
||||||
|
file_proto_pboperator_operator_proto_rawDesc = nil
|
||||||
|
file_proto_pboperator_operator_proto_goTypes = nil
|
||||||
|
file_proto_pboperator_operator_proto_depIdxs = nil
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package hashicorp.consul.internal.operator;
|
||||||
|
|
||||||
|
// Operator defines a set of operators operation applicable to Consul
|
||||||
|
service OperatorService {
|
||||||
|
//Transfer raft leadership to another node
|
||||||
|
rpc TransferLeader(TransferLeaderRequest) returns (TransferLeaderResponse) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
message TransferLeaderRequest {
|
||||||
|
string ID = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// mog annotation:
|
||||||
|
//
|
||||||
|
// target=github.com/hashicorp/consul/api.TransferLeaderResponse
|
||||||
|
// output=operator.gen.go
|
||||||
|
// name=API
|
||||||
|
message TransferLeaderResponse {
|
||||||
|
// true if the transfer is a success
|
||||||
|
bool success = 1;
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// - protoc-gen-go-grpc v1.2.0
|
||||||
|
// - protoc (unknown)
|
||||||
|
// source: proto/pboperator/operator.proto
|
||||||
|
|
||||||
|
package pboperator
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
grpc "google.golang.org/grpc"
|
||||||
|
codes "google.golang.org/grpc/codes"
|
||||||
|
status "google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This is a compile-time assertion to ensure that this generated file
|
||||||
|
// is compatible with the grpc package it is being compiled against.
|
||||||
|
// Requires gRPC-Go v1.32.0 or later.
|
||||||
|
const _ = grpc.SupportPackageIsVersion7
|
||||||
|
|
||||||
|
// OperatorServiceClient is the client API for OperatorService service.
|
||||||
|
//
|
||||||
|
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||||
|
type OperatorServiceClient interface {
|
||||||
|
// Transfer raft leadership to another node
|
||||||
|
TransferLeader(ctx context.Context, in *TransferLeaderRequest, opts ...grpc.CallOption) (*TransferLeaderResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type operatorServiceClient struct {
|
||||||
|
cc grpc.ClientConnInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOperatorServiceClient(cc grpc.ClientConnInterface) OperatorServiceClient {
|
||||||
|
return &operatorServiceClient{cc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *operatorServiceClient) TransferLeader(ctx context.Context, in *TransferLeaderRequest, opts ...grpc.CallOption) (*TransferLeaderResponse, error) {
|
||||||
|
out := new(TransferLeaderResponse)
|
||||||
|
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.operator.OperatorService/TransferLeader", in, out, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperatorServiceServer is the server API for OperatorService service.
|
||||||
|
// All implementations should embed UnimplementedOperatorServiceServer
|
||||||
|
// for forward compatibility
|
||||||
|
type OperatorServiceServer interface {
|
||||||
|
// Transfer raft leadership to another node
|
||||||
|
TransferLeader(context.Context, *TransferLeaderRequest) (*TransferLeaderResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnimplementedOperatorServiceServer should be embedded to have forward compatible implementations.
|
||||||
|
type UnimplementedOperatorServiceServer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (UnimplementedOperatorServiceServer) TransferLeader(context.Context, *TransferLeaderRequest) (*TransferLeaderResponse, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method TransferLeader not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnsafeOperatorServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||||
|
// Use of this interface is not recommended, as added methods to OperatorServiceServer will
|
||||||
|
// result in compilation errors.
|
||||||
|
type UnsafeOperatorServiceServer interface {
|
||||||
|
mustEmbedUnimplementedOperatorServiceServer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterOperatorServiceServer(s grpc.ServiceRegistrar, srv OperatorServiceServer) {
|
||||||
|
s.RegisterService(&OperatorService_ServiceDesc, srv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func _OperatorService_TransferLeader_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(TransferLeaderRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(OperatorServiceServer).TransferLeader(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: "/hashicorp.consul.internal.operator.OperatorService/TransferLeader",
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(OperatorServiceServer).TransferLeader(ctx, req.(*TransferLeaderRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OperatorService_ServiceDesc is the grpc.ServiceDesc for OperatorService service.
|
||||||
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
|
// and not to be introspected or modified (even as a copy)
|
||||||
|
var OperatorService_ServiceDesc = grpc.ServiceDesc{
|
||||||
|
ServiceName: "hashicorp.consul.internal.operator.OperatorService",
|
||||||
|
HandlerType: (*OperatorServiceServer)(nil),
|
||||||
|
Methods: []grpc.MethodDesc{
|
||||||
|
{
|
||||||
|
MethodName: "TransferLeader",
|
||||||
|
Handler: _OperatorService_TransferLeader_Handler,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Streams: []grpc.StreamDesc{},
|
||||||
|
Metadata: "proto/pboperator/operator.proto",
|
||||||
|
}
|
|
@ -145,3 +145,37 @@ $ curl \
|
||||||
--request DELETE \
|
--request DELETE \
|
||||||
"http://127.0.0.1:8500/v1/operator/raft/peer?address=1.2.3.4:5678"
|
"http://127.0.0.1:8500/v1/operator/raft/peer?address=1.2.3.4:5678"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Transfer Raft Leadership
|
||||||
|
|
||||||
|
This endpoint transfers the Raft leadership from the current leader to a different Raft peer.
|
||||||
|
The new leader is selected at random unless explicitly specified with the `id` parameter.
|
||||||
|
|
||||||
|
| Method | Path | Produces |
|
||||||
|
| -------- | ------------------------------- | ------------------ |
|
||||||
|
| `POST` | `/operator/raft/transfer-leader` | `application/json` |
|
||||||
|
|
||||||
|
The table below shows this endpoint's support for
|
||||||
|
[blocking queries](/api-docs/features/blocking),
|
||||||
|
[consistency modes](/api-docs/features/consistency),
|
||||||
|
[agent caching](/api-docs/features/caching), and
|
||||||
|
[required ACLs](/api-docs#authentication).
|
||||||
|
|
||||||
|
| Blocking Queries | Consistency Modes | Agent Caching | ACL Required |
|
||||||
|
| ---------------- | ----------------- | ------------- | ---------------- |
|
||||||
|
| `NO` | `none` | `none` | `operator:write` |
|
||||||
|
|
||||||
|
The corresponding CLI command is [`consul operator raft transfer-leader`](/commands/operator/raft#transfer-leader).
|
||||||
|
|
||||||
|
### Query Parameters
|
||||||
|
|
||||||
|
- `id` `(string: "")` - Specifies the node ID of the Raft peer to transfer leadership to.
|
||||||
|
If empty, leadership transfers to a random server agent.
|
||||||
|
|
||||||
|
### Sample Request
|
||||||
|
|
||||||
|
```shell-session
|
||||||
|
$ curl \
|
||||||
|
--request POST \
|
||||||
|
"http://127.0.0.1:8500/v1/operator/raft/transfer-leader?id=09cfc046-e74a-ad49-1aad-c2161b7fe677"
|
||||||
|
```
|
||||||
|
|
|
@ -104,3 +104,26 @@ Usage: `consul operator raft remove-peer -address="IP:port"`
|
||||||
- `-id` - ID of the server to remove.
|
- `-id` - ID of the server to remove.
|
||||||
|
|
||||||
The return code will indicate success or failure.
|
The return code will indicate success or failure.
|
||||||
|
|
||||||
|
## transfer-leader
|
||||||
|
|
||||||
|
Corresponding HTTP API Endpoint: [\[POST\] /v1/operator/raft/transfer-leader](/api-docs/operator/raft#transfer-raft-leadership)
|
||||||
|
|
||||||
|
This command transfers Raft leadership to another server agent. If an `id` is provided, Consul transfers leadership to the server with that id.
|
||||||
|
|
||||||
|
Use this command to change leadership without restarting the leader node, which maintains quorum and workload capacity.
|
||||||
|
|
||||||
|
The table below shows this command's [required ACLs](/api-docs#authentication). Configuration of
|
||||||
|
[blocking queries](/api-docs/features/blocking) and [agent caching](/api-docs/features/caching)
|
||||||
|
are not supported from commands, but may be from the corresponding HTTP endpoint.
|
||||||
|
|
||||||
|
| ACL Required |
|
||||||
|
| ---------------- |
|
||||||
|
| `operator:write` |
|
||||||
|
|
||||||
|
Usage: `consul operator raft transfer-leader -id="server id"`
|
||||||
|
|
||||||
|
- `-id` - Specifies the node ID of the raft peer to transfer leadership to.
|
||||||
|
If empty, leadership transfers to a random server agent.
|
||||||
|
|
||||||
|
The return code indicates success or failure.
|
||||||
|
|
Loading…
Reference in New Issue