peering: send leader addr (#13342)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
alex 2022-06-06 10:00:38 -07:00 committed by GitHub
parent 7a039b46a2
commit bbbc50815a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 253 additions and 128 deletions

View File

@ -20,7 +20,7 @@ type peeringBackend struct {
srv *Server
connPool GRPCClientConner
apply *peeringApply
monitor *leadershipMonitor
addr *leaderAddr
}
var _ peering.Backend = (*peeringBackend)(nil)
@ -31,7 +31,7 @@ func NewPeeringBackend(srv *Server, connPool GRPCClientConner) peering.Backend {
srv: srv,
connPool: connPool,
apply: &peeringApply{srv: srv},
monitor: &leadershipMonitor{},
addr: &leaderAddr{},
}
}
@ -104,8 +104,8 @@ func (b *peeringBackend) Apply() peering.Apply {
return b.apply
}
func (b *peeringBackend) LeadershipMonitor() peering.LeadershipMonitor {
return b.monitor
func (b *peeringBackend) LeaderAddress() peering.LeaderAddress {
return b.addr
}
func (b *peeringBackend) EnterpriseCheckPartitions(partition string) error {
@ -116,19 +116,19 @@ func (b *peeringBackend) IsLeader() bool {
return b.srv.IsLeader()
}
type leadershipMonitor struct {
type leaderAddr struct {
lock sync.RWMutex
leaderAddr string
}
func (m *leadershipMonitor) UpdateLeaderAddr(addr string) {
func (m *leaderAddr) Set(addr string) {
m.lock.Lock()
defer m.lock.Unlock()
m.leaderAddr = addr
}
func (m *leadershipMonitor) GetLeaderAddr() string {
func (m *leaderAddr) Get() string {
m.lock.RLock()
defer m.lock.RUnlock()
@ -166,4 +166,4 @@ func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error {
}
var _ peering.Apply = (*peeringApply)(nil)
var _ peering.LeadershipMonitor = (*leadershipMonitor)(nil)
var _ peering.LeaderAddress = (*leaderAddr)(nil)

View File

@ -1659,7 +1659,7 @@ func (s *Server) trackLeaderChanges() {
}
s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr))
s.peeringService.Backend.LeadershipMonitor().UpdateLeaderAddr(string(leaderObs.LeaderAddr))
s.peeringService.Backend.LeaderAddress().Set(string(leaderObs.LeaderAddr))
case <-s.shutdownCh:
s.raft.DeregisterObserver(observer)
return

View File

@ -1952,9 +1952,9 @@ func TestServer_RPC_RateLimit(t *testing.T) {
})
}
// TestServer_Peering_LeadershipMonitor tests that a peering service can receive the leader address
// through the LeadershipMonitor IRL.
func TestServer_Peering_LeadershipMonitor(t *testing.T) {
// TestServer_Peering_LeadershipCheck tests that a peering service can receive the leader address
// through the LeaderAddress IRL.
func TestServer_Peering_LeadershipCheck(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
@ -1991,8 +1991,8 @@ func TestServer_Peering_LeadershipMonitor(t *testing.T) {
// the actual tests
// when leadership has been established s2 should have the address of s1
// in its leadership monitor in the peering service
peeringLeaderAddr := s2.peeringService.Backend.LeadershipMonitor().GetLeaderAddr()
// in the peering service
peeringLeaderAddr := s2.peeringService.Backend.LeaderAddress().Get()
require.Equal(t, s1.config.RPCAddr.String(), peeringLeaderAddr)
// test corollary by transitivity to future-proof against any setup bugs

View File

@ -107,20 +107,20 @@ type Backend interface {
Store() Store
Apply() Apply
LeadershipMonitor() LeadershipMonitor
LeaderAddress() LeaderAddress
}
// LeadershipMonitor provides a way for the consul server to update the peering service about
// LeaderAddress provides a way for the consul server to update the peering service about
// the server's leadership status.
// Server addresses should look like: ip:port
type LeadershipMonitor interface {
// UpdateLeaderAddr is called on a raft.LeaderObservation in a go routine in the consul server;
type LeaderAddress interface {
// Set is called on a raft.LeaderObservation in a go routine in the consul server;
// see trackLeaderChanges()
UpdateLeaderAddr(leaderAddr string)
Set(leaderAddr string)
// GetLeaderAddr provides the best hint for the current address of the leader.
// Get provides the best hint for the current address of the leader.
// There is no guarantee that this is the actual address of the leader.
GetLeaderAddr() string
Get() string
}
// Store provides a read-only interface for querying Peering data.
@ -473,9 +473,17 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource
if !s.Backend.IsLeader() {
// we are not the leader so we will hang up on the dialer
// TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking)
s.logger.Error("cannot establish a peering stream on a follower node")
return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
st, err := grpcstatus.New(codes.FailedPrecondition,
"cannot establish a peering stream on a follower node").WithDetails(
&pbpeering.LeaderAddress{Address: s.Backend.LeaderAddress().Get()})
if err != nil {
s.logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
} else {
return st.Err()
}
}
// Initial message on a new stream must be a new subscription request.
@ -660,9 +668,17 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
if !s.Backend.IsLeader() {
// we are not the leader anymore so we will hang up on the dialer
// TODO(peering): in the future we want to indicate the address of the leader server as a message to the dialer (best effort, non blocking)
logger.Error("node is not a leader anymore; cannot continue streaming")
return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming")
st, err := grpcstatus.New(codes.FailedPrecondition,
"node is not a leader anymore; cannot continue streaming").WithDetails(
&pbpeering.LeaderAddress{Address: s.Backend.LeaderAddress().Get()})
if err != nil {
s.logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming")
} else {
return st.Err()
}
}
if req := msg.GetRequest(); req != nil {

View File

@ -47,6 +47,9 @@ func TestStreamResources_Server_Follower(t *testing.T) {
leader: func() bool {
return false
},
leaderAddress: &leaderAddress{
addr: "expected:address",
},
})
client := NewMockClient(context.Background())
@ -63,10 +66,20 @@ func TestStreamResources_Server_Follower(t *testing.T) {
}
}()
// expect error
msg, err := client.Recv()
require.Nil(t, msg)
require.Error(t, err)
require.EqualError(t, err, "rpc error: code = FailedPrecondition desc = cannot establish a peering stream on a follower node")
// expect a status error
st, ok := status.FromError(err)
require.True(t, ok, "need to get back a grpc status error")
deets := st.Details()
// expect a LeaderAddress message
exp := []interface{}{&pbpeering.LeaderAddress{Address: "expected:address"}}
prototest.AssertDeepEqual(t, exp, deets)
}
// TestStreamResources_Server_LeaderBecomesFollower simulates a srv that is a leader when the
@ -94,6 +107,9 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
store: store,
pub: publisher,
leader: leaderFunc,
leaderAddress: &leaderAddress{
addr: "expected:address",
},
})
client := NewMockClient(context.Background())
@ -147,10 +163,20 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
err2 := client.Send(input2)
require.NoError(t, err2)
// expect error
msg2, err2 := client.Recv()
require.Nil(t, msg2)
require.Error(t, err2)
require.EqualError(t, err2, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming")
// expect a status error
st, ok := status.FromError(err2)
require.True(t, ok, "need to get back a grpc status error")
deets := st.Details()
// expect a LeaderAddress message
exp := []interface{}{&pbpeering.LeaderAddress{Address: "expected:address"}}
prototest.AssertDeepEqual(t, exp, deets)
}
func TestStreamResources_Server_FirstRequest(t *testing.T) {
@ -901,28 +927,28 @@ func makeClient(
}
type testStreamBackend struct {
pub state.EventPublisher
store *state.Store
leader func() bool
leadershipMonitor *leadershipMonitor
pub state.EventPublisher
store *state.Store
leader func() bool
leaderAddress *leaderAddress
}
var _ LeadershipMonitor = (*leadershipMonitor)(nil)
var _ LeaderAddress = (*leaderAddress)(nil)
type leadershipMonitor struct {
type leaderAddress struct {
addr string
}
func (l *leadershipMonitor) UpdateLeaderAddr(addr string) {
func (l *leaderAddress) Set(addr string) {
// noop
}
func (l *leadershipMonitor) GetLeaderAddr() string {
// noop
return ""
func (l *leaderAddress) Get() string {
return l.addr
}
func (b *testStreamBackend) LeadershipMonitor() LeadershipMonitor {
return b.leadershipMonitor
func (b *testStreamBackend) LeaderAddress() LeaderAddress {
return b.leaderAddress
}
func (b *testStreamBackend) IsLeader() bool {

View File

@ -286,3 +286,13 @@ func (msg *ReplicationMessage_Terminated) MarshalBinary() ([]byte, error) {
func (msg *ReplicationMessage_Terminated) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *LeaderAddress) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *LeaderAddress) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

View File

@ -1720,6 +1720,56 @@ func (*ReplicationMessage_Response_) isReplicationMessage_Payload() {}
func (*ReplicationMessage_Terminated_) isReplicationMessage_Payload() {}
// LeaderAddress is sent when the peering service runs on a consul node
// that is not a leader. The node either lost leadership, or never was a leader.
type LeaderAddress struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// address is an ip:port best effort hint at what could be the cluster leader's address
Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
}
func (x *LeaderAddress) Reset() {
*x = LeaderAddress{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeering_peering_proto_msgTypes[25]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *LeaderAddress) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*LeaderAddress) ProtoMessage() {}
func (x *LeaderAddress) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeering_peering_proto_msgTypes[25]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use LeaderAddress.ProtoReflect.Descriptor instead.
func (*LeaderAddress) Descriptor() ([]byte, []int) {
return file_proto_pbpeering_peering_proto_rawDescGZIP(), []int{25}
}
func (x *LeaderAddress) GetAddress() string {
if x != nil {
return x.Address
}
return ""
}
// A Request requests to subscribe to a resource of a given type.
type ReplicationMessage_Request struct {
state protoimpl.MessageState
@ -1743,7 +1793,7 @@ type ReplicationMessage_Request struct {
func (x *ReplicationMessage_Request) Reset() {
*x = ReplicationMessage_Request{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeering_peering_proto_msgTypes[29]
mi := &file_proto_pbpeering_peering_proto_msgTypes[30]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1756,7 +1806,7 @@ func (x *ReplicationMessage_Request) String() string {
func (*ReplicationMessage_Request) ProtoMessage() {}
func (x *ReplicationMessage_Request) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeering_peering_proto_msgTypes[29]
mi := &file_proto_pbpeering_peering_proto_msgTypes[30]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1822,7 +1872,7 @@ type ReplicationMessage_Response struct {
func (x *ReplicationMessage_Response) Reset() {
*x = ReplicationMessage_Response{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeering_peering_proto_msgTypes[30]
mi := &file_proto_pbpeering_peering_proto_msgTypes[31]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1835,7 +1885,7 @@ func (x *ReplicationMessage_Response) String() string {
func (*ReplicationMessage_Response) ProtoMessage() {}
func (x *ReplicationMessage_Response) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeering_peering_proto_msgTypes[30]
mi := &file_proto_pbpeering_peering_proto_msgTypes[31]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -1897,7 +1947,7 @@ type ReplicationMessage_Terminated struct {
func (x *ReplicationMessage_Terminated) Reset() {
*x = ReplicationMessage_Terminated{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeering_peering_proto_msgTypes[31]
mi := &file_proto_pbpeering_peering_proto_msgTypes[32]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -1910,7 +1960,7 @@ func (x *ReplicationMessage_Terminated) String() string {
func (*ReplicationMessage_Terminated) ProtoMessage() {}
func (x *ReplicationMessage_Terminated) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeering_peering_proto_msgTypes[31]
mi := &file_proto_pbpeering_peering_proto_msgTypes[32]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -2155,68 +2205,71 @@ var file_proto_pbpeering_peering_proto_rawDesc = []byte{
0x77, 0x6e, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01,
0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x02, 0x1a, 0x0c, 0x0a, 0x0a,
0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x50, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2a, 0x53, 0x0a, 0x0c, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67,
0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x4e, 0x44, 0x45, 0x46, 0x49, 0x4e,
0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x49, 0x54, 0x49, 0x41, 0x4c, 0x10,
0x01, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, 0x12, 0x0b, 0x0a,
0x07, 0x46, 0x41, 0x49, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x45,
0x52, 0x4d, 0x49, 0x4e, 0x41, 0x54, 0x45, 0x44, 0x10, 0x04, 0x32, 0xea, 0x05, 0x0a, 0x0e, 0x50,
0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, 0x0a,
0x0d, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1d,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74,
0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e,
0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65,
0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a,
0x08, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x65, 0x12, 0x18, 0x2e, 0x70, 0x65, 0x65, 0x72,
0x69, 0x6e, 0x67, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x6e,
0x69, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48,
0x0a, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1b, 0x2e,
0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x52,
0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x65, 0x65,
0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x61, 0x64,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, 0x0b, 0x50, 0x65, 0x65, 0x72,
0x69, 0x6e, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e,
0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50,
0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x4e, 0x0a, 0x0d, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x44, 0x65, 0x6c,
0x65, 0x74, 0x65, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65,
0x65, 0x72, 0x69, 0x6e, 0x67, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65,
0x72, 0x69, 0x6e, 0x67, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x4b, 0x0a, 0x0c, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x57, 0x72, 0x69,
0x74, 0x65, 0x12, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65,
0x72, 0x69, 0x6e, 0x67, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69,
0x6e, 0x67, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x6f, 0x0a, 0x18, 0x54, 0x72, 0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x4c, 0x69,
0x73, 0x74, 0x42, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x28, 0x2e, 0x70, 0x65,
0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x72, 0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c,
0x65, 0x4c, 0x69, 0x73, 0x74, 0x42, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e,
0x54, 0x72, 0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x42,
0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x54, 0x0a, 0x0f, 0x54, 0x72, 0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x52,
0x65, 0x61, 0x64, 0x12, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x72,
0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x54,
0x72, 0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4f, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72,
0x69, 0x6e, 0x67, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67,
0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x84, 0x01, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x2e,
0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x42, 0x0c, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67,
0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f,
0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x70, 0x65, 0x65,
0x72, 0x69, 0x6e, 0x67, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x50, 0x65, 0x65,
0x72, 0x69, 0x6e, 0x67, 0xca, 0x02, 0x07, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0xe2, 0x02,
0x13, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61,
0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x07, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73,
0x2a, 0x53, 0x0a, 0x0c, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x65,
0x12, 0x0d, 0x0a, 0x09, 0x55, 0x4e, 0x44, 0x45, 0x46, 0x49, 0x4e, 0x45, 0x44, 0x10, 0x00, 0x12,
0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x49, 0x54, 0x49, 0x41, 0x4c, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06,
0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c,
0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x45, 0x52, 0x4d, 0x49, 0x4e, 0x41,
0x54, 0x45, 0x44, 0x10, 0x04, 0x32, 0xea, 0x05, 0x0a, 0x0e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e,
0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, 0x0a, 0x0d, 0x47, 0x65, 0x6e, 0x65,
0x72, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72,
0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x6b, 0x65,
0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69,
0x6e, 0x67, 0x2e, 0x47, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x08, 0x49, 0x6e, 0x69, 0x74,
0x69, 0x61, 0x74, 0x65, 0x12, 0x18, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x49,
0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74,
0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, 0x0b, 0x50, 0x65, 0x65,
0x72, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69,
0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e,
0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x4c, 0x69,
0x73, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65,
0x72, 0x69, 0x6e, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x1c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e,
0x67, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, 0x0a,
0x0d, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x1d,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67,
0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e,
0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x44,
0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a,
0x0c, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x1c, 0x2e,
0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x57,
0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x70, 0x65,
0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x57, 0x72, 0x69,
0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x6f, 0x0a, 0x18, 0x54, 0x72,
0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x42, 0x79, 0x53,
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x28, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67,
0x2e, 0x54, 0x72, 0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x4c, 0x69, 0x73, 0x74,
0x42, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x29, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x72, 0x75, 0x73, 0x74,
0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x42, 0x79, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x54, 0x0a, 0x0f, 0x54,
0x72, 0x75, 0x73, 0x74, 0x42, 0x75, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1f,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x72, 0x75, 0x73, 0x74, 0x42, 0x75,
0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x72, 0x75, 0x73, 0x74, 0x42,
0x75, 0x6e, 0x64, 0x6c, 0x65, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x4f, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, 0x75,
0x72, 0x63, 0x65, 0x73, 0x12, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x52,
0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x1a, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x52, 0x65, 0x70, 0x6c,
0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01,
0x30, 0x01, 0x42, 0x84, 0x01, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x69,
0x6e, 0x67, 0x42, 0x0c, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f,
0x50, 0x01, 0x5a, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68,
0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x70, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0xa2,
0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0xca,
0x02, 0x07, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0xe2, 0x02, 0x13, 0x50, 0x65, 0x65, 0x72,
0x69, 0x6e, 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea,
0x02, 0x07, 0x50, 0x65, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
}
var (
@ -2232,7 +2285,7 @@ func file_proto_pbpeering_peering_proto_rawDescGZIP() []byte {
}
var file_proto_pbpeering_peering_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_proto_pbpeering_peering_proto_msgTypes = make([]protoimpl.MessageInfo, 32)
var file_proto_pbpeering_peering_proto_msgTypes = make([]protoimpl.MessageInfo, 33)
var file_proto_pbpeering_peering_proto_goTypes = []interface{}{
(PeeringState)(0), // 0: peering.PeeringState
(ReplicationMessage_Response_Operation)(0), // 1: peering.ReplicationMessage.Response.Operation
@ -2261,33 +2314,34 @@ var file_proto_pbpeering_peering_proto_goTypes = []interface{}{
(*InitiateRequest)(nil), // 24: peering.InitiateRequest
(*InitiateResponse)(nil), // 25: peering.InitiateResponse
(*ReplicationMessage)(nil), // 26: peering.ReplicationMessage
nil, // 27: peering.Peering.MetaEntry
nil, // 28: peering.PeeringWriteRequest.MetaEntry
nil, // 29: peering.GenerateTokenRequest.MetaEntry
nil, // 30: peering.InitiateRequest.MetaEntry
(*ReplicationMessage_Request)(nil), // 31: peering.ReplicationMessage.Request
(*ReplicationMessage_Response)(nil), // 32: peering.ReplicationMessage.Response
(*ReplicationMessage_Terminated)(nil), // 33: peering.ReplicationMessage.Terminated
(*pbstatus.Status)(nil), // 34: status.Status
(*anypb.Any)(nil), // 35: google.protobuf.Any
(*LeaderAddress)(nil), // 27: peering.LeaderAddress
nil, // 28: peering.Peering.MetaEntry
nil, // 29: peering.PeeringWriteRequest.MetaEntry
nil, // 30: peering.GenerateTokenRequest.MetaEntry
nil, // 31: peering.InitiateRequest.MetaEntry
(*ReplicationMessage_Request)(nil), // 32: peering.ReplicationMessage.Request
(*ReplicationMessage_Response)(nil), // 33: peering.ReplicationMessage.Response
(*ReplicationMessage_Terminated)(nil), // 34: peering.ReplicationMessage.Terminated
(*pbstatus.Status)(nil), // 35: status.Status
(*anypb.Any)(nil), // 36: google.protobuf.Any
}
var file_proto_pbpeering_peering_proto_depIdxs = []int32{
27, // 0: peering.Peering.Meta:type_name -> peering.Peering.MetaEntry
28, // 0: peering.Peering.Meta:type_name -> peering.Peering.MetaEntry
0, // 1: peering.Peering.State:type_name -> peering.PeeringState
2, // 2: peering.PeeringReadResponse.Peering:type_name -> peering.Peering
2, // 3: peering.PeeringListResponse.Peerings:type_name -> peering.Peering
2, // 4: peering.PeeringWriteRequest.Peering:type_name -> peering.Peering
28, // 5: peering.PeeringWriteRequest.Meta:type_name -> peering.PeeringWriteRequest.MetaEntry
29, // 5: peering.PeeringWriteRequest.Meta:type_name -> peering.PeeringWriteRequest.MetaEntry
3, // 6: peering.TrustBundleListByServiceResponse.Bundles:type_name -> peering.PeeringTrustBundle
3, // 7: peering.TrustBundleReadResponse.Bundle:type_name -> peering.PeeringTrustBundle
3, // 8: peering.PeeringTrustBundleWriteRequest.PeeringTrustBundle:type_name -> peering.PeeringTrustBundle
29, // 9: peering.GenerateTokenRequest.Meta:type_name -> peering.GenerateTokenRequest.MetaEntry
30, // 10: peering.InitiateRequest.Meta:type_name -> peering.InitiateRequest.MetaEntry
31, // 11: peering.ReplicationMessage.request:type_name -> peering.ReplicationMessage.Request
32, // 12: peering.ReplicationMessage.response:type_name -> peering.ReplicationMessage.Response
33, // 13: peering.ReplicationMessage.terminated:type_name -> peering.ReplicationMessage.Terminated
34, // 14: peering.ReplicationMessage.Request.Error:type_name -> status.Status
35, // 15: peering.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any
30, // 9: peering.GenerateTokenRequest.Meta:type_name -> peering.GenerateTokenRequest.MetaEntry
31, // 10: peering.InitiateRequest.Meta:type_name -> peering.InitiateRequest.MetaEntry
32, // 11: peering.ReplicationMessage.request:type_name -> peering.ReplicationMessage.Request
33, // 12: peering.ReplicationMessage.response:type_name -> peering.ReplicationMessage.Response
34, // 13: peering.ReplicationMessage.terminated:type_name -> peering.ReplicationMessage.Terminated
35, // 14: peering.ReplicationMessage.Request.Error:type_name -> status.Status
36, // 15: peering.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any
1, // 16: peering.ReplicationMessage.Response.operation:type_name -> peering.ReplicationMessage.Response.Operation
22, // 17: peering.PeeringService.GenerateToken:input_type -> peering.GenerateTokenRequest
24, // 18: peering.PeeringService.Initiate:input_type -> peering.InitiateRequest
@ -2620,8 +2674,8 @@ func file_proto_pbpeering_peering_proto_init() {
return nil
}
}
file_proto_pbpeering_peering_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Request); i {
file_proto_pbpeering_peering_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*LeaderAddress); i {
case 0:
return &v.state
case 1:
@ -2633,7 +2687,7 @@ func file_proto_pbpeering_peering_proto_init() {
}
}
file_proto_pbpeering_peering_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Response); i {
switch v := v.(*ReplicationMessage_Request); i {
case 0:
return &v.state
case 1:
@ -2645,6 +2699,18 @@ func file_proto_pbpeering_peering_proto_init() {
}
}
file_proto_pbpeering_peering_proto_msgTypes[31].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Response); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_pbpeering_peering_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Terminated); i {
case 0:
return &v.state
@ -2668,7 +2734,7 @@ func file_proto_pbpeering_peering_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_pbpeering_peering_proto_rawDesc,
NumEnums: 2,
NumMessages: 32,
NumMessages: 33,
NumExtensions: 0,
NumServices: 1,
},

View File

@ -365,3 +365,10 @@ message ReplicationMessage {
// This message signals to the peer that they should clean up their local state about the peering.
message Terminated {}
}
// LeaderAddress is sent when the peering service runs on a consul node
// that is not a leader. The node either lost leadership, or never was a leader.
message LeaderAddress {
// address is an ip:port best effort hint at what could be the cluster leader's address
string address = 1;
}