peerstream: require a resource subscription to receive updates of that type (#13767)

This mimics xDS's discovery protocol where you must request a resource
explicitly for the exporting side to send those events to you.

As part of this I aligned the overall ResourceURL with the TypeURL that
gets embedded into the encoded protobuf Any construct. The
CheckServiceNodes is now wrapped in a better named "ExportedService"
struct now.
This commit is contained in:
R.B. Boyer 2022-07-15 15:03:40 -05:00 committed by GitHub
parent c737301093
commit cd513aeead
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 736 additions and 345 deletions

View File

@ -5,10 +5,9 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/genproto/googleapis/rpc/code"
newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
@ -39,7 +38,16 @@ func makeServiceResponse(
logger hclog.Logger, logger hclog.Logger,
update cache.UpdateEvent, update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) { ) (*pbpeerstream.ReplicationMessage_Response, error) {
any, csn, err := marshalToProtoAny[*pbservice.IndexedCheckServiceNodes](update.Result) csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
}
export := &pbpeerstream.ExportedService{
Nodes: csn.Nodes,
}
any, err := anypb.New(export)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err) return nil, fmt.Errorf("failed to marshal: %w", err)
} }
@ -53,9 +61,9 @@ func makeServiceResponse(
// //
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
// Case #1 is a no-op for the importing peer. // Case #1 is a no-op for the importing peer.
if len(csn.Nodes) == 0 { if len(export.Nodes) == 0 {
return &pbpeerstream.ReplicationMessage_Response{ return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
// TODO(peering): Nonce management // TODO(peering): Nonce management
Nonce: "", Nonce: "",
ResourceID: serviceName, ResourceID: serviceName,
@ -65,7 +73,7 @@ func makeServiceResponse(
// If there are nodes in the response, we push them as an UPSERT operation. // If there are nodes in the response, we push them as an UPSERT operation.
return &pbpeerstream.ReplicationMessage_Response{ return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
// TODO(peering): Nonce management // TODO(peering): Nonce management
Nonce: "", Nonce: "",
ResourceID: serviceName, ResourceID: serviceName,
@ -84,7 +92,7 @@ func makeCARootsResponse(
} }
return &pbpeerstream.ReplicationMessage_Response{ return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLRoots, ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
// TODO(peering): Nonce management // TODO(peering): Nonce management
Nonce: "", Nonce: "",
ResourceID: "roots", ResourceID: "roots",
@ -97,13 +105,13 @@ func makeCARootsResponse(
// the protobuf.Any type, the asserted T type, and any errors // the protobuf.Any type, the asserted T type, and any errors
// during marshalling or type assertion. // during marshalling or type assertion.
// `in` MUST be of type T or it returns an error. // `in` MUST be of type T or it returns an error.
func marshalToProtoAny[T proto.Message](in any) (*anypb.Any, T, error) { func marshalToProtoAny[T newproto.Message](in any) (*anypb.Any, T, error) {
typ, ok := in.(T) typ, ok := in.(T)
if !ok { if !ok {
var outType T var outType T
return nil, typ, fmt.Errorf("input type is not %T: %T", outType, in) return nil, typ, fmt.Errorf("input type is not %T: %T", outType, in)
} }
any, err := ptypes.MarshalAny(typ) any, err := anypb.New(typ)
if err != nil { if err != nil {
return nil, typ, err return nil, typ, err
} }
@ -186,20 +194,23 @@ func (s *Server) handleUpsert(
resource *anypb.Any, resource *anypb.Any,
logger hclog.Logger, logger hclog.Logger,
) error { ) error {
if resource.TypeUrl != resourceURL {
return fmt.Errorf("mismatched resourceURL %q and Any typeUrl %q", resourceURL, resource.TypeUrl)
}
switch resourceURL { switch resourceURL {
case pbpeerstream.TypeURLService: case pbpeerstream.TypeURLExportedService:
sn := structs.ServiceNameFromString(resourceID) sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition) sn.OverridePartition(partition)
csn := &pbservice.IndexedCheckServiceNodes{} export := &pbpeerstream.ExportedService{}
if err := ptypes.UnmarshalAny(resource, csn); err != nil { if err := resource.UnmarshalTo(export); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err) return fmt.Errorf("failed to unmarshal resource: %w", err)
} }
err := s.handleUpdateService(peerName, partition, sn, csn) err := s.handleUpdateService(peerName, partition, sn, export)
if err != nil { if err != nil {
logger.Error("did not increment imported services count", "service_name", sn.String(), "error", err) return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err)
return err
} }
logger.Trace("incrementing imported services count", "service_name", sn.String()) logger.Trace("incrementing imported services count", "service_name", sn.String())
@ -207,9 +218,9 @@ func (s *Server) handleUpsert(
return nil return nil
case pbpeerstream.TypeURLRoots: case pbpeerstream.TypeURLPeeringTrustBundle:
roots := &pbpeering.PeeringTrustBundle{} roots := &pbpeering.PeeringTrustBundle{}
if err := ptypes.UnmarshalAny(resource, roots); err != nil { if err := resource.UnmarshalTo(roots); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err) return fmt.Errorf("failed to unmarshal resource: %w", err)
} }
@ -232,7 +243,7 @@ func (s *Server) handleUpdateService(
peerName string, peerName string,
partition string, partition string,
sn structs.ServiceName, sn structs.ServiceName,
pbNodes *pbservice.IndexedCheckServiceNodes, export *pbpeerstream.ExportedService,
) error { ) error {
// Capture instances in the state store for reconciliation later. // Capture instances in the state store for reconciliation later.
_, storedInstances, err := s.GetStore().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName) _, storedInstances, err := s.GetStore().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName)
@ -240,7 +251,7 @@ func (s *Server) handleUpdateService(
return fmt.Errorf("failed to read imported services: %w", err) return fmt.Errorf("failed to read imported services: %w", err)
} }
structsNodes, err := pbNodes.CheckServiceNodesToStruct() structsNodes, err := export.CheckServiceNodesToStruct()
if err != nil { if err != nil {
return fmt.Errorf("failed to convert protobuf instances to structs: %w", err) return fmt.Errorf("failed to convert protobuf instances to structs: %w", err)
} }
@ -444,7 +455,7 @@ func (s *Server) handleDelete(
logger hclog.Logger, logger hclog.Logger,
) error { ) error {
switch resourceURL { switch resourceURL {
case pbpeerstream.TypeURLService: case pbpeerstream.TypeURLExportedService:
sn := structs.ServiceNameFromString(resourceID) sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition) sn.OverridePartition(partition)

View File

@ -9,7 +9,6 @@ import (
"github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status" grpcstatus "google.golang.org/grpc/status"
@ -99,11 +98,12 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
} }
streamReq := HandleStreamRequest{ streamReq := HandleStreamRequest{
LocalID: p.ID, LocalID: p.ID,
RemoteID: "", RemoteID: "",
PeerName: p.Name, PeerName: p.Name,
Partition: p.Partition, Partition: p.Partition,
Stream: stream, InitialResourceURL: req.ResourceURL,
Stream: stream,
} }
err = s.HandleStream(streamReq) err = s.HandleStream(streamReq)
// A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown. // A nil error indicates that the peering was deleted and the stream needs to be gracefully shutdown.
@ -129,6 +129,9 @@ type HandleStreamRequest struct {
// Partition is the local partition associated with the peer. // Partition is the local partition associated with the peer.
Partition string Partition string
// InitialResourceURL is the ResourceURL from the initial Request.
InitialResourceURL string
// Stream is the open stream to the peer cluster. // Stream is the open stream to the peer cluster.
Stream BidirectionalStream Stream BidirectionalStream
} }
@ -183,6 +186,13 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
} }
} }
remoteSubTracker := newResourceSubscriptionTracker()
if streamReq.InitialResourceURL != "" {
if remoteSubTracker.Subscribe(streamReq.InitialResourceURL) {
logger.Info("subscribing to resource type", "resourceURL", streamReq.InitialResourceURL)
}
}
mgr := newSubscriptionManager( mgr := newSubscriptionManager(
streamReq.Stream.Context(), streamReq.Stream.Context(),
logger, logger,
@ -190,24 +200,31 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
trustDomain, trustDomain,
s.Backend, s.Backend,
s.GetStore, s.GetStore,
remoteSubTracker,
) )
subCh := mgr.subscribe(streamReq.Stream.Context(), streamReq.LocalID, streamReq.PeerName, streamReq.Partition) subCh := mgr.subscribe(streamReq.Stream.Context(), streamReq.LocalID, streamReq.PeerName, streamReq.Partition)
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ // Subscribe to all relevant resource types.
ResourceURL: pbpeerstream.TypeURLService, for _, resourceURL := range []string{
PeerID: streamReq.RemoteID, pbpeerstream.TypeURLExportedService,
}) pbpeerstream.TypeURLPeeringTrustBundle,
logTraceSend(logger, sub) } {
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: resourceURL,
PeerID: streamReq.RemoteID,
})
logTraceSend(logger, sub)
if err := streamReq.Stream.Send(sub); err != nil { if err := streamReq.Stream.Send(sub); err != nil {
if err == io.EOF { if err == io.EOF {
logger.Info("stream ended by peer") logger.Info("stream ended by peer")
status.TrackReceiveError(err.Error()) status.TrackReceiveError(err.Error())
return nil return nil
}
// TODO(peering) Test error handling in calls to Send/Recv
status.TrackSendError(err.Error())
return fmt.Errorf("failed to send subscription for %q to stream: %w", resourceURL, err)
} }
// TODO(peering) Test error handling in calls to Send/Recv
status.TrackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err)
} }
// TODO(peering): Should this be buffered? // TODO(peering): Should this be buffered?
@ -289,17 +306,86 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
if !pbpeerstream.KnownTypeURL(req.ResourceURL) { if !pbpeerstream.KnownTypeURL(req.ResourceURL) {
return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL) return grpcstatus.Errorf(codes.InvalidArgument, "subscription request to unknown resource URL: %s", req.ResourceURL)
} }
switch {
case req.ResponseNonce == "":
// TODO(peering): This can happen on a client peer since they don't try to receive subscriptions before entering HandleStream.
// Should change that behavior or only allow it that one time.
case req.Error != nil && (req.Error.Code != int32(code.Code_OK) || req.Error.Message != ""): // There are different formats of requests depending upon where in the stream lifecycle we are.
//
// 1. Initial Request: This is the first request being received
// FROM the establishing peer. This is handled specially in
// (*Server).StreamResources BEFORE calling
// (*Server).HandleStream. This takes care of determining what
// the PeerID is for the stream. This is ALSO treated as (2) below.
//
// 2. Subscription Request: This is the first request for a
// given ResourceURL within a stream. The Initial Request (1)
// is always one of these as well.
//
// These must contain a valid ResourceURL with no Error or
// ResponseNonce set.
//
// It is valid to subscribe to the same ResourceURL twice
// within the lifetime of a stream, but all duplicate
// subscriptions are treated as no-ops upon receipt.
//
// 3. ACK Request: This is the message sent in reaction to an
// earlier Response to indicate that the response was processed
// by the other side successfully.
//
// These must contain a ResponseNonce and no Error.
//
// 4. NACK Request: This is the message sent in reaction to an
// earlier Response to indicate that the response was NOT
// processed by the other side successfully.
//
// These must contain a ResponseNonce and an Error.
//
if !remoteSubTracker.IsSubscribed(req.ResourceURL) {
// This must be a new subscription request to add a new
// resource type, vet it like a new request.
if !streamReq.WasDialed() {
if req.PeerID != "" && req.PeerID != streamReq.RemoteID {
// Not necessary after the first request from the dialer,
// but if provided must match.
return grpcstatus.Errorf(codes.InvalidArgument,
"initial subscription requests for a resource type must have consistent PeerID values: got=%q expected=%q",
req.PeerID,
streamReq.RemoteID,
)
}
}
if req.ResponseNonce != "" {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription requests for a resource type must not contain a nonce")
}
if req.Error != nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request for a resource type must not contain an error")
}
if remoteSubTracker.Subscribe(req.ResourceURL) {
logger.Info("subscribing to resource type", "resourceURL", req.ResourceURL)
}
status.TrackAck()
continue
}
// At this point we have a valid ResourceURL and we are subscribed to it.
switch {
case req.ResponseNonce == "" && req.Error != nil:
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request for a resource type must not contain an error")
case req.ResponseNonce != "" && req.Error == nil: // ACK
// TODO(peering): handle ACK fully
status.TrackAck()
case req.ResponseNonce != "" && req.Error != nil: // NACK
// TODO(peering): handle NACK fully
logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message) logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message)
status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message)) status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message))
default: default:
status.TrackAck() // This branch might be dead code, but it could also happen
// during a stray 're-subscribe' so just ignore the
// message.
} }
continue continue
@ -425,3 +511,63 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) {
logger.Trace("replication message", "direction", dir, "protobuf", out) logger.Trace("replication message", "direction", dir, "protobuf", out)
} }
// resourceSubscriptionTracker is used to keep track of the ResourceURLs that a
// stream has subscribed to and can notify you when a subscription comes in by
// closing the channels returned by SubscribedChan.
type resourceSubscriptionTracker struct {
// notifierMap keeps track of a notification channel for each resourceURL.
// Keys may exist in here even when they do not exist in 'subscribed' as
// calling SubscribedChan has to possibly create and and hand out a
// notification channel in advance of any notification.
notifierMap map[string]chan struct{}
// subscribed is a set that keeps track of resourceURLs that are currently
// subscribed to. Keys are never deleted. If a key is present in this map
// it is also present in 'notifierMap'.
subscribed map[string]struct{}
}
func newResourceSubscriptionTracker() *resourceSubscriptionTracker {
return &resourceSubscriptionTracker{
subscribed: make(map[string]struct{}),
notifierMap: make(map[string]chan struct{}),
}
}
// IsSubscribed returns true if the given ResourceURL has an active subscription.
func (t *resourceSubscriptionTracker) IsSubscribed(resourceURL string) bool {
_, ok := t.subscribed[resourceURL]
return ok
}
// Subscribe subscribes to the given ResourceURL. It will return true if this
// was the FIRST time a subscription occurred. It will also close the
// notification channel associated with this ResourceURL.
func (t *resourceSubscriptionTracker) Subscribe(resourceURL string) bool {
if _, ok := t.subscribed[resourceURL]; ok {
return false
}
t.subscribed[resourceURL] = struct{}{}
// and notify
ch := t.ensureNotifierChan(resourceURL)
close(ch)
return true
}
// SubscribedChan returns a channel that will be closed when the ResourceURL is
// subscribed using the Subscribe method.
func (t *resourceSubscriptionTracker) SubscribedChan(resourceURL string) <-chan struct{} {
return t.ensureNotifierChan(resourceURL)
}
func (t *resourceSubscriptionTracker) ensureNotifierChan(resourceURL string) chan struct{} {
if ch, ok := t.notifierMap[resourceURL]; ok {
return ch
}
ch := make(chan struct{})
t.notifierMap[resourceURL] = ch
return ch
}

View File

@ -12,15 +12,14 @@ import (
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
@ -97,18 +96,6 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
backend.leaderAddr = "expected:address" backend.leaderAddr = "expected:address"
}) })
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
err := srv.StreamResources(client.ReplicationStream)
if err != nil {
errCh <- err
}
}()
p := writePeeringToBeDialed(t, store, 1, "my-peer") p := writePeeringToBeDialed(t, store, 1, "my-peer")
require.Empty(t, p.PeerID, "should be empty if being dialed") require.Empty(t, p.PeerID, "should be empty if being dialed")
peerID := p.ID peerID := p.ID
@ -116,53 +103,73 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
// Set the initial roots and CA configuration. // Set the initial roots and CA configuration.
_, _ = writeInitialRootsAndCA(t, store) _, _ = writeInitialRootsAndCA(t, store)
// Receive a subscription from a peer client := NewMockClient(context.Background())
sub := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ errCh := make(chan error, 1)
Request: &pbpeerstream.ReplicationMessage_Request{ client.ErrCh = errCh
PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLService, go func() {
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err
}
}()
// Receive a subscription from a peer. This message arrives while the
// server is a leader and should work.
testutil.RunStep(t, "send subscription request to leader and consume its two requests", func(t *testing.T) {
sub := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLExportedService,
},
}, },
}, }
} err := client.Send(sub)
err := client.Send(sub) require.NoError(t, err)
require.NoError(t, err)
msg, err := client.Recv() msg1, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
require.NotEmpty(t, msg) require.NotEmpty(t, msg1)
receiveRoots, err := client.Recv() msg2, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, receiveRoots.GetResponse()) require.NotEmpty(t, msg2)
require.Equal(t, pbpeerstream.TypeURLRoots, receiveRoots.GetResponse().ResourceURL) })
input2 := &pbpeerstream.ReplicationMessage{ // The ACK will be a new request but at this point the server is not the
Payload: &pbpeerstream.ReplicationMessage_Request_{ // leader in the test and this should fail.
Request: &pbpeerstream.ReplicationMessage_Request{ testutil.RunStep(t, "ack fails with non leader", func(t *testing.T) {
ResourceURL: pbpeerstream.TypeURLService, ack := &pbpeerstream.ReplicationMessage{
ResponseNonce: "1", Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1",
},
}, },
}, }
}
err2 := client.Send(input2) err := client.Send(ack)
require.NoError(t, err2) require.NoError(t, err)
// expect error // expect error
msg2, err2 := client.Recv() msg, err := client.Recv()
require.Nil(t, msg2) require.Nil(t, msg)
require.Error(t, err2) require.Error(t, err)
require.EqualError(t, err2, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming") require.EqualError(t, err, "rpc error: code = FailedPrecondition desc = node is not a leader anymore; cannot continue streaming")
// expect a status error // expect a status error
st, ok := status.FromError(err2) st, ok := status.FromError(err)
require.True(t, ok, "need to get back a grpc status error") require.True(t, ok, "need to get back a grpc status error")
deets := st.Details()
// expect a LeaderAddress message // expect a LeaderAddress message
exp := []interface{}{&pbpeerstream.LeaderAddress{Address: "expected:address"}} expect := []interface{}{
prototest.AssertDeepEqual(t, exp, deets) &pbpeerstream.LeaderAddress{Address: "expected:address"},
}
prototest.AssertDeepEqual(t, expect, st.Details())
})
} }
func TestStreamResources_Server_FirstRequest(t *testing.T) { func TestStreamResources_Server_FirstRequest(t *testing.T) {
@ -204,7 +211,7 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
input: &pbpeerstream.ReplicationMessage{ input: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{ Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{ Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api-service", ResourceID: "api-service",
Nonce: "2", Nonce: "2",
}, },
@ -251,7 +258,7 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: "63b60245-c475-426b-b314-4588d210859d", PeerID: "63b60245-c475-426b-b314-4588d210859d",
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
}, },
}, },
}, },
@ -291,7 +298,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
receiveRoots, err := client.Recv() receiveRoots, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, receiveRoots.GetResponse()) require.NotNil(t, receiveRoots.GetResponse())
require.Equal(t, pbpeerstream.TypeURLRoots, receiveRoots.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, receiveRoots.GetResponse().ResourceURL)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) { testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
@ -347,7 +354,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}) })
}) })
var sequence uint64
var lastSendSuccess time.Time var lastSendSuccess time.Time
testutil.RunStep(t, "ack tracked as success", func(t *testing.T) { testutil.RunStep(t, "ack tracked as success", func(t *testing.T) {
@ -355,18 +361,17 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID, PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1", ResponseNonce: "1",
// Acks do not have an Error populated in the request // Acks do not have an Error populated in the request
}, },
}, },
} }
lastSendSuccess = it.FutureNow(1)
err := client.Send(ack) err := client.Send(ack)
require.NoError(t, err) require.NoError(t, err)
sequence++
lastSendSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC()
expect := Status{ expect := Status{
Connected: true, Connected: true,
@ -388,7 +393,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID, PeerID: peerID,
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "2", ResponseNonce: "2",
Error: &pbstatus.Status{ Error: &pbstatus.Status{
Code: int32(code.Code_UNAVAILABLE), Code: int32(code.Code_UNAVAILABLE),
@ -397,12 +402,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}, },
}, },
} }
lastNack = it.FutureNow(1)
err := client.Send(nack) err := client.Send(nack)
require.NoError(t, err) require.NoError(t, err)
sequence++
lastNackMsg = "client peer was unable to apply resource: bad bad not good" lastNackMsg = "client peer was unable to apply resource: bad bad not good"
lastNack = it.base.Add(time.Duration(sequence) * time.Second).UTC()
expect := Status{ expect := Status{
Connected: true, Connected: true,
@ -424,22 +429,22 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{ resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{ Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{ Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api", ResourceID: "api",
Nonce: "21", Nonce: "21",
Operation: pbpeerstream.Operation_OPERATION_UPSERT, Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}), Resource: makeAnyPB(t, &pbpeerstream.ExportedService{}),
}, },
}, },
} }
lastRecvSuccess = it.FutureNow(1)
err := client.Send(resp) err := client.Send(resp)
require.NoError(t, err) require.NoError(t, err)
sequence++
expectRoots := &pbpeerstream.ReplicationMessage{ expectRoots := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{ Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{ Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLRoots, ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
ResourceID: "roots", ResourceID: "roots",
Resource: makeAnyPB(t, &pbpeering.PeeringTrustBundle{ Resource: makeAnyPB(t, &pbpeering.PeeringTrustBundle{
TrustDomain: connect.TestTrustDomain, TrustDomain: connect.TestTrustDomain,
@ -460,15 +465,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expectAck := &pbpeerstream.ReplicationMessage{ expectAck := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "21", ResponseNonce: "21",
}, },
}, },
} }
prototest.AssertDeepEqual(t, expectAck, ack) prototest.AssertDeepEqual(t, expectAck, ack)
lastRecvSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC()
api := structs.NewServiceName("api", nil) api := structs.NewServiceName("api", nil)
expect := Status{ expect := Status{
@ -496,7 +499,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
resp := &pbpeerstream.ReplicationMessage{ resp := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{ Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: &pbpeerstream.ReplicationMessage_Response{ Response: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "web", ResourceID: "web",
Nonce: "24", Nonce: "24",
@ -505,9 +508,9 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}, },
}, },
} }
lastRecvError = it.FutureNow(1)
err := client.Send(resp) err := client.Send(resp)
require.NoError(t, err) require.NoError(t, err)
sequence++
ack, err := client.Recv() ack, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
@ -515,7 +518,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
expectNack := &pbpeerstream.ReplicationMessage{ expectNack := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "24", ResponseNonce: "24",
Error: &pbstatus.Status{ Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT), Code: int32(code.Code_INVALID_ARGUMENT),
@ -526,7 +529,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
} }
prototest.AssertDeepEqual(t, expectNack, ack) prototest.AssertDeepEqual(t, expectNack, ack)
lastRecvError = it.base.Add(time.Duration(sequence) * time.Second).UTC()
lastRecvErrorMsg = `unsupported operation: "OPERATION_UNSPECIFIED"` lastRecvErrorMsg = `unsupported operation: "OPERATION_UNSPECIFIED"`
api := structs.NewServiceName("api", nil) api := structs.NewServiceName("api", nil)
@ -552,14 +554,12 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}) })
testutil.RunStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) { testutil.RunStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) {
lastRecvError = it.FutureNow(1)
disconnectTime := it.FutureNow(2)
lastRecvErrorMsg = io.EOF.Error()
client.Close() client.Close()
sequence++
lastRecvError := it.base.Add(time.Duration(sequence) * time.Second).UTC()
sequence++
disconnectTime := it.base.Add(time.Duration(sequence) * time.Second).UTC()
api := structs.NewServiceName("api", nil) api := structs.NewServiceName("api", nil)
expect := Status{ expect := Status{
@ -569,8 +569,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
LastNackMessage: lastNackMsg, LastNackMessage: lastNackMsg,
DisconnectTime: disconnectTime, DisconnectTime: disconnectTime,
LastReceiveSuccess: lastRecvSuccess, LastReceiveSuccess: lastRecvSuccess,
LastReceiveErrorMessage: io.EOF.Error(),
LastReceiveError: lastRecvError, LastReceiveError: lastRecvError,
LastReceiveErrorMessage: lastRecvErrorMsg,
ImportedServices: map[string]struct{}{ ImportedServices: map[string]struct{}{
api.String(): {}, api.String(): {},
}, },
@ -654,35 +654,35 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
expectReplEvents(t, client, expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
// Roots tested in TestStreamResources_Server_CARootUpdates // Roots tested in TestStreamResources_Server_CARootUpdates
}, },
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// no mongo instances exist // no mongo instances exist
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoSN, msg.GetResponse().ResourceID) require.Equal(t, mongoSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource) require.Nil(t, msg.GetResponse().Resource)
}, },
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// proxies can't export because no mesh gateway exists yet // proxies can't export because no mesh gateway exists yet
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource) require.Nil(t, msg.GetResponse().Resource)
}, },
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID) require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes var nodes pbpeerstream.ExportedService
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1) require.Len(t, nodes.Nodes, 1)
}, },
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
// proxies can't export because no mesh gateway exists yet // proxies can't export because no mesh gateway exists yet
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource) require.Nil(t, msg.GetResponse().Resource)
@ -704,12 +704,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
expectReplEvents(t, client, expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes var nodes pbpeerstream.ExportedService
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1) require.Len(t, nodes.Nodes, 1)
pm := nodes.Nodes[0].Service.Connect.PeerMeta pm := nodes.Nodes[0].Service.Connect.PeerMeta
@ -721,12 +721,12 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.Equal(t, spiffeIDs, pm.SpiffeID) require.Equal(t, spiffeIDs, pm.SpiffeID)
}, },
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLService, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLExportedService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes var nodes pbpeerstream.ExportedService
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(t, nodes.Nodes, 1) require.Len(t, nodes.Nodes, 1)
pm := nodes.Nodes[0].Service.Connect.PeerMeta pm := nodes.Nodes[0].Service.Connect.PeerMeta
@ -758,8 +758,8 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
require.Equal(r, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) require.Equal(r, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
var nodes pbservice.IndexedCheckServiceNodes var nodes pbpeerstream.ExportedService
require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&nodes))
require.Len(r, nodes.Nodes, 1) require.Len(r, nodes.Nodes, 1)
}) })
}) })
@ -824,12 +824,12 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
testutil.RunStep(t, "initial CA Roots replication", func(t *testing.T) { testutil.RunStep(t, "initial CA Roots replication", func(t *testing.T) {
expectReplEvents(t, client, expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
require.Equal(t, "roots", msg.GetResponse().ResourceID) require.Equal(t, "roots", msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var trustBundle pbpeering.PeeringTrustBundle var trustBundle pbpeering.PeeringTrustBundle
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &trustBundle)) require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&trustBundle))
require.ElementsMatch(t, []string{rootA.RootCert}, trustBundle.RootPEMs) require.ElementsMatch(t, []string{rootA.RootCert}, trustBundle.RootPEMs)
expect := connect.SpiffeIDSigningForCluster(clusterID).Host() expect := connect.SpiffeIDSigningForCluster(clusterID).Host()
@ -853,12 +853,12 @@ func TestStreamResources_Server_CARootUpdates(t *testing.T) {
expectReplEvents(t, client, expectReplEvents(t, client,
func(t *testing.T, msg *pbpeerstream.ReplicationMessage) { func(t *testing.T, msg *pbpeerstream.ReplicationMessage) {
require.Equal(t, pbpeerstream.TypeURLRoots, msg.GetResponse().ResourceURL) require.Equal(t, pbpeerstream.TypeURLPeeringTrustBundle, msg.GetResponse().ResourceURL)
require.Equal(t, "roots", msg.GetResponse().ResourceID) require.Equal(t, "roots", msg.GetResponse().ResourceID)
require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation) require.Equal(t, pbpeerstream.Operation_OPERATION_UPSERT, msg.GetResponse().Operation)
var trustBundle pbpeering.PeeringTrustBundle var trustBundle pbpeering.PeeringTrustBundle
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &trustBundle)) require.NoError(t, msg.GetResponse().Resource.UnmarshalTo(&trustBundle))
require.ElementsMatch(t, []string{rootB.RootCert, rootC.RootCert}, trustBundle.RootPEMs) require.ElementsMatch(t, []string{rootB.RootCert, rootC.RootCert}, trustBundle.RootPEMs)
expect := connect.SpiffeIDSigningForCluster(clusterID).Host() expect := connect.SpiffeIDSigningForCluster(clusterID).Host()
@ -886,33 +886,57 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
} }
}() }()
// Issue a services subscription to server // Issue a services and roots subscription pair to server
init := &pbpeerstream.ReplicationMessage{ for _, resourceURL := range []string{
Payload: &pbpeerstream.ReplicationMessage_Request_{ pbpeerstream.TypeURLExportedService,
Request: &pbpeerstream.ReplicationMessage_Request{ pbpeerstream.TypeURLPeeringTrustBundle,
PeerID: peerID, } {
ResourceURL: pbpeerstream.TypeURLService, init := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: resourceURL,
},
}, },
}, }
require.NoError(t, client.Send(init))
} }
require.NoError(t, client.Send(init))
// Receive a services subscription from server // Receive a services and roots subscription request pair from server
receivedSub, err := client.Recv() receivedSub1, err := client.Recv()
require.NoError(t, err)
receivedSub2, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
expect := &pbpeerstream.ReplicationMessage{ expect := []*pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ {
Request: &pbpeerstream.ReplicationMessage_Request{ Payload: &pbpeerstream.ReplicationMessage_Request_{
ResourceURL: pbpeerstream.TypeURLService, Request: &pbpeerstream.ReplicationMessage_Request{
// The PeerID field is only set for the messages coming FROM ResourceURL: pbpeerstream.TypeURLExportedService,
// the establishing side and are going to be empty from the // The PeerID field is only set for the messages coming FROM
// other side. // the establishing side and are going to be empty from the
PeerID: "", // other side.
PeerID: "",
},
},
},
{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
// The PeerID field is only set for the messages coming FROM
// the establishing side and are going to be empty from the
// other side.
PeerID: "",
},
}, },
}, },
} }
prototest.AssertDeepEqual(t, expect, receivedSub) got := []*pbpeerstream.ReplicationMessage{
receivedSub1,
receivedSub2,
}
prototest.AssertElementsMatch[*pbpeerstream.ReplicationMessage](t, expect, got)
return client return client
} }
@ -1017,16 +1041,16 @@ func Test_processResponse_Validation(t *testing.T) {
{ {
name: "valid upsert", name: "valid upsert",
in: &pbpeerstream.ReplicationMessage_Response{ in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api", ResourceID: "api",
Nonce: "1", Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_UPSERT, Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: makeAnyPB(t, &pbservice.IndexedCheckServiceNodes{}), Resource: makeAnyPB(t, &pbpeerstream.ExportedService{}),
}, },
expect: &pbpeerstream.ReplicationMessage{ expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1", ResponseNonce: "1",
}, },
}, },
@ -1036,7 +1060,7 @@ func Test_processResponse_Validation(t *testing.T) {
{ {
name: "valid delete", name: "valid delete",
in: &pbpeerstream.ReplicationMessage_Response{ in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: "api", ResourceID: "api",
Nonce: "1", Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_DELETE, Operation: pbpeerstream.Operation_OPERATION_DELETE,
@ -1044,7 +1068,7 @@ func Test_processResponse_Validation(t *testing.T) {
expect: &pbpeerstream.ReplicationMessage{ expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1", ResponseNonce: "1",
}, },
}, },
@ -1075,14 +1099,14 @@ func Test_processResponse_Validation(t *testing.T) {
{ {
name: "unknown operation", name: "unknown operation",
in: &pbpeerstream.ReplicationMessage_Response{ in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
Nonce: "1", Nonce: "1",
Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED, Operation: pbpeerstream.Operation_OPERATION_UNSPECIFIED,
}, },
expect: &pbpeerstream.ReplicationMessage{ expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1", ResponseNonce: "1",
Error: &pbstatus.Status{ Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT), Code: int32(code.Code_INVALID_ARGUMENT),
@ -1096,14 +1120,14 @@ func Test_processResponse_Validation(t *testing.T) {
{ {
name: "out of range operation", name: "out of range operation",
in: &pbpeerstream.ReplicationMessage_Response{ in: &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
Nonce: "1", Nonce: "1",
Operation: pbpeerstream.Operation(100000), Operation: pbpeerstream.Operation(100000),
}, },
expect: &pbpeerstream.ReplicationMessage{ expect: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{ Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResponseNonce: "1", ResponseNonce: "1",
Error: &pbstatus.Status{ Error: &pbstatus.Status{
Code: int32(code.Code_INVALID_ARGUMENT), Code: int32(code.Code_INVALID_ARGUMENT),
@ -1163,8 +1187,8 @@ func writeInitialRootsAndCA(t *testing.T, store *state.Store) (string, *structs.
return clusterID, rootA return clusterID, rootA
} }
func makeAnyPB(t *testing.T, pb proto.Message) *any.Any { func makeAnyPB(t *testing.T, pb proto.Message) *anypb.Any {
any, err := ptypes.MarshalAny(pb) any, err := anypb.New(pb)
require.NoError(t, err) require.NoError(t, err)
return any return any
} }
@ -1255,7 +1279,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
type testCase struct { type testCase struct {
name string name string
seed []*structs.RegisterRequest seed []*structs.RegisterRequest
input *pbservice.IndexedCheckServiceNodes input *pbpeerstream.ExportedService
expect map[string]structs.CheckServiceNodes expect map[string]structs.CheckServiceNodes
expectedImportedServicesCount int expectedImportedServicesCount int
} }
@ -1296,7 +1320,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} }
in := &pbpeerstream.ReplicationMessage_Response{ in := &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService, ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: apiSN.String(), ResourceID: apiSN.String(),
Nonce: "1", Nonce: "1",
Operation: op, Operation: op,
@ -1322,7 +1346,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
tt := []testCase{ tt := []testCase{
{ {
name: "upsert two service instances to the same node", name: "upsert two service instances to the same node",
input: &pbservice.IndexedCheckServiceNodes{ input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{ Nodes: []*pbservice.CheckServiceNode{
{ {
Node: &pbservice.Node{ Node: &pbservice.Node{
@ -1454,7 +1478,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}, },
{ {
name: "upsert two service instances to different nodes", name: "upsert two service instances to different nodes",
input: &pbservice.IndexedCheckServiceNodes{ input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{ Nodes: []*pbservice.CheckServiceNode{
{ {
Node: &pbservice.Node{ Node: &pbservice.Node{
@ -1636,7 +1660,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}, },
}, },
}, },
input: &pbservice.IndexedCheckServiceNodes{}, input: &pbpeerstream.ExportedService{},
expect: map[string]structs.CheckServiceNodes{ expect: map[string]structs.CheckServiceNodes{
"api": {}, "api": {},
}, },
@ -1695,7 +1719,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}, },
}, },
// Nil input is for the "api" service. // Nil input is for the "api" service.
input: &pbservice.IndexedCheckServiceNodes{}, input: &pbpeerstream.ExportedService{},
expect: map[string]structs.CheckServiceNodes{ expect: map[string]structs.CheckServiceNodes{
"api": {}, "api": {},
// Existing redis service was not affected by deletion. // Existing redis service was not affected by deletion.
@ -1761,7 +1785,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}, },
}, },
}, },
input: &pbservice.IndexedCheckServiceNodes{ input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{ Nodes: []*pbservice.CheckServiceNode{
{ {
Node: &pbservice.Node{ Node: &pbservice.Node{
@ -1856,7 +1880,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}, },
}, },
}, },
input: &pbservice.IndexedCheckServiceNodes{ input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{ Nodes: []*pbservice.CheckServiceNode{
{ {
Node: &pbservice.Node{ Node: &pbservice.Node{
@ -1991,7 +2015,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
}, },
}, },
}, },
input: &pbservice.IndexedCheckServiceNodes{ input: &pbpeerstream.ExportedService{
Nodes: []*pbservice.CheckServiceNode{ Nodes: []*pbservice.CheckServiceNode{
{ {
Node: &pbservice.Node{ Node: &pbservice.Node{

View File

@ -19,6 +19,13 @@ import (
// streaming machinery instead to be cheaper. // streaming machinery instead to be cheaper.
func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) { func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Context, state *subscriptionState, peerID string) {
// Wait until this is subscribed-to.
select {
case <-m.serviceSubReady:
case <-ctx.Done():
return
}
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend // syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
// match the list of services exported to the peer. // match the list of services exported to the peer.
m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) { m.syncViaBlockingQuery(ctx, "exported-services", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
@ -34,6 +41,13 @@ func (m *subscriptionManager) notifyExportedServicesForPeerID(ctx context.Contex
// TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions // TODO: add a new streaming subscription type to list-by-kind-and-partition since we're getting evictions
func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) { func (m *subscriptionManager) notifyMeshGatewaysForPartition(ctx context.Context, state *subscriptionState, partition string) {
// Wait until this is subscribed-to.
select {
case <-m.serviceSubReady:
case <-ctx.Done():
return
}
m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) { m.syncViaBlockingQuery(ctx, "mesh-gateways", func(ctx context.Context, store StateStore, ws memdb.WatchSet) (interface{}, error) {
// Fetch our current list of all mesh gateways. // Fetch our current list of all mesh gateways.
entMeta := structs.DefaultEnterpriseMetaInPartition(partition) entMeta := structs.DefaultEnterpriseMetaInPartition(partition)

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
) )
@ -33,12 +34,14 @@ type SubscriptionBackend interface {
// subscriptionManager handlers requests to subscribe to events from an events publisher. // subscriptionManager handlers requests to subscribe to events from an events publisher.
type subscriptionManager struct { type subscriptionManager struct {
logger hclog.Logger logger hclog.Logger
config Config config Config
trustDomain string trustDomain string
viewStore MaterializedViewStore viewStore MaterializedViewStore
backend SubscriptionBackend backend SubscriptionBackend
getStore func() StateStore getStore func() StateStore
serviceSubReady <-chan struct{}
trustBundlesSubReady <-chan struct{}
} }
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
@ -49,18 +52,21 @@ func newSubscriptionManager(
trustDomain string, trustDomain string,
backend SubscriptionBackend, backend SubscriptionBackend,
getStore func() StateStore, getStore func() StateStore,
remoteSubTracker *resourceSubscriptionTracker,
) *subscriptionManager { ) *subscriptionManager {
logger = logger.Named("subscriptions") logger = logger.Named("subscriptions")
store := submatview.NewStore(logger.Named("viewstore")) store := submatview.NewStore(logger.Named("viewstore"))
go store.Run(ctx) go store.Run(ctx)
return &subscriptionManager{ return &subscriptionManager{
logger: logger, logger: logger,
config: config, config: config,
trustDomain: trustDomain, trustDomain: trustDomain,
viewStore: store, viewStore: store,
backend: backend, backend: backend,
getStore: getStore, getStore: getStore,
serviceSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLExportedService),
trustBundlesSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringTrustBundle),
} }
} }
@ -297,6 +303,13 @@ func (m *subscriptionManager) notifyRootCAUpdatesForPartition(
updateCh chan<- cache.UpdateEvent, updateCh chan<- cache.UpdateEvent,
partition string, partition string,
) { ) {
// Wait until this is subscribed-to.
select {
case <-m.trustBundlesSubReady:
case <-ctx.Done():
return
}
var idx uint64 var idx uint64
// TODO(peering): retry logic; fail past a threshold // TODO(peering): retry logic; fail past a threshold
for { for {

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/prototest" "github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
@ -32,12 +33,16 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering") _, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
// Only configure a tracker for catalog events.
tracker := newResourceSubscriptionTracker()
tracker.Subscribe(pbpeerstream.TypeURLExportedService)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1", Datacenter: "dc1",
ConnectEnabled: true, ConnectEnabled: true,
}, connect.TestTrustDomain, backend, func() StateStore { }, connect.TestTrustDomain, backend, func() StateStore {
return backend.store return backend.store
}) }, tracker)
subCh := mgr.subscribe(ctx, id, "my-peering", partition) subCh := mgr.subscribe(ctx, id, "my-peering", partition)
var ( var (
@ -442,12 +447,16 @@ func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering") _, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
// Only configure a tracker for catalog events.
tracker := newResourceSubscriptionTracker()
tracker.Subscribe(pbpeerstream.TypeURLExportedService)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1", Datacenter: "dc1",
ConnectEnabled: true, ConnectEnabled: true,
}, connect.TestTrustDomain, backend, func() StateStore { }, connect.TestTrustDomain, backend, func() StateStore {
return backend.store return backend.store
}) }, tracker)
subCh := mgr.subscribe(ctx, id, "my-peering", partition) subCh := mgr.subscribe(ctx, id, "my-peering", partition)
// Register two services that are not yet exported // Register two services that are not yet exported
@ -571,21 +580,21 @@ func TestSubscriptionManager_CARoots(t *testing.T) {
_, id := backend.ensurePeering(t, "my-peering") _, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
// Only configure a tracker for CA roots events.
tracker := newResourceSubscriptionTracker()
tracker.Subscribe(pbpeerstream.TypeURLPeeringTrustBundle)
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{ mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1", Datacenter: "dc1",
ConnectEnabled: true, ConnectEnabled: true,
}, connect.TestTrustDomain, backend, func() StateStore { }, connect.TestTrustDomain, backend, func() StateStore {
return backend.store return backend.store
}) }, tracker)
subCh := mgr.subscribe(ctx, id, "my-peering", partition) subCh := mgr.subscribe(ctx, id, "my-peering", partition)
testutil.RunStep(t, "initial events contain trust bundle", func(t *testing.T) { testutil.RunStep(t, "initial events contain trust bundle", func(t *testing.T) {
// events are ordered so we can expect a deterministic list // events are ordered so we can expect a deterministic list
expectEvents(t, subCh, expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
// mesh-gateway assertions are done in other tests
require.Equal(t, subMeshGateway+partition, got.CorrelationID)
},
func(t *testing.T, got cache.UpdateEvent) { func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, subCARoot, got.CorrelationID) require.Equal(t, subCARoot, got.CorrelationID)
roots, ok := got.Result.(*pbpeering.PeeringTrustBundle) roots, ok := got.Result.(*pbpeering.PeeringTrustBundle)

View File

@ -2,6 +2,7 @@ package peerstream
import ( import (
"context" "context"
"fmt"
"io" "io"
"sync" "sync"
"time" "time"
@ -24,14 +25,7 @@ func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error {
} }
func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) { func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error) {
select { return c.RecvWithTimeout(10 * time.Millisecond)
case err := <-c.ErrCh:
return nil, err
case r := <-c.ReplicationStream.sendCh:
return r, nil
case <-time.After(10 * time.Millisecond):
return nil, io.EOF
}
} }
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) { func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error) {
@ -61,7 +55,6 @@ type MockStream struct {
recvCh chan *pbpeerstream.ReplicationMessage recvCh chan *pbpeerstream.ReplicationMessage
ctx context.Context ctx context.Context
mu sync.Mutex
} }
var _ pbpeerstream.PeerStreamService_StreamResourcesServer = (*MockStream)(nil) var _ pbpeerstream.PeerStreamService_StreamResourcesServer = (*MockStream)(nil)
@ -117,12 +110,37 @@ func (s *MockStream) SendHeader(metadata.MD) error {
// SetTrailer implements grpc.ServerStream // SetTrailer implements grpc.ServerStream
func (s *MockStream) SetTrailer(metadata.MD) {} func (s *MockStream) SetTrailer(metadata.MD) {}
// incrementalTime is an artificial clock used during testing. For those
// scenarios you would pass around the method pointer for `Now` in places where
// you would be using `time.Now`.
type incrementalTime struct { type incrementalTime struct {
base time.Time base time.Time
next uint64 next uint64
mu sync.Mutex
} }
// Now advances the internal clock by 1 second and returns that value.
func (t *incrementalTime) Now() time.Time { func (t *incrementalTime) Now() time.Time {
t.mu.Lock()
defer t.mu.Unlock()
t.next++ t.next++
return t.base.Add(time.Duration(t.next) * time.Second)
dur := time.Duration(t.next) * time.Second
return t.base.Add(dur)
}
// FutureNow will return a given future value of the Now() function.
// The numerical argument indicates which future Now value you wanted. The
// value must be > 0.
func (t *incrementalTime) FutureNow(n int) time.Time {
if n < 1 {
panic(fmt.Sprintf("argument must be > 1 but was %d", n))
}
t.mu.Lock()
defer t.mu.Unlock()
dur := time.Duration(t.next+uint64(n)) * time.Second
return t.base.Add(dur)
} }

View File

@ -0,0 +1,25 @@
package pbpeerstream
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
pbservice "github.com/hashicorp/consul/proto/pbservice"
)
// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent.
func (s *ExportedService) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) {
if s == nil {
return nil, nil
}
resp := make([]structs.CheckServiceNode, 0, len(s.Nodes))
for _, pb := range s.Nodes {
instance, err := pbservice.CheckServiceNodeToStructs(pb)
if err != nil {
return resp, fmt.Errorf("failed to convert instance: %w", err)
}
resp = append(resp, *instance)
}
return resp, nil
}

View File

@ -56,3 +56,13 @@ func (msg *LeaderAddress) MarshalBinary() ([]byte, error) {
func (msg *LeaderAddress) UnmarshalBinary(b []byte) error { func (msg *LeaderAddress) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ExportedService) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ExportedService) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

View File

@ -7,6 +7,7 @@
package pbpeerstream package pbpeerstream
import ( import (
pbservice "github.com/hashicorp/consul/proto/pbservice"
pbstatus "github.com/hashicorp/consul/proto/pbstatus" pbstatus "github.com/hashicorp/consul/proto/pbstatus"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
@ -220,6 +221,54 @@ func (x *LeaderAddress) GetAddress() string {
return "" return ""
} }
// ExportedService is one of the types of data returned via peer stream replication.
type ExportedService struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Nodes []*pbservice.CheckServiceNode `protobuf:"bytes,1,rep,name=Nodes,proto3" json:"Nodes,omitempty"`
}
func (x *ExportedService) Reset() {
*x = ExportedService{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ExportedService) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ExportedService) ProtoMessage() {}
func (x *ExportedService) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ExportedService.ProtoReflect.Descriptor instead.
func (*ExportedService) Descriptor() ([]byte, []int) {
return file_proto_pbpeerstream_peerstream_proto_rawDescGZIP(), []int{2}
}
func (x *ExportedService) GetNodes() []*pbservice.CheckServiceNode {
if x != nil {
return x.Nodes
}
return nil
}
// A Request requests to subscribe to a resource of a given type. // A Request requests to subscribe to a resource of a given type.
type ReplicationMessage_Request struct { type ReplicationMessage_Request struct {
state protoimpl.MessageState state protoimpl.MessageState
@ -244,7 +293,7 @@ type ReplicationMessage_Request struct {
func (x *ReplicationMessage_Request) Reset() { func (x *ReplicationMessage_Request) Reset() {
*x = ReplicationMessage_Request{} *x = ReplicationMessage_Request{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2] mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -257,7 +306,7 @@ func (x *ReplicationMessage_Request) String() string {
func (*ReplicationMessage_Request) ProtoMessage() {} func (*ReplicationMessage_Request) ProtoMessage() {}
func (x *ReplicationMessage_Request) ProtoReflect() protoreflect.Message { func (x *ReplicationMessage_Request) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[2] mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -323,7 +372,7 @@ type ReplicationMessage_Response struct {
func (x *ReplicationMessage_Response) Reset() { func (x *ReplicationMessage_Response) Reset() {
*x = ReplicationMessage_Response{} *x = ReplicationMessage_Response{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3] mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -336,7 +385,7 @@ func (x *ReplicationMessage_Response) String() string {
func (*ReplicationMessage_Response) ProtoMessage() {} func (*ReplicationMessage_Response) ProtoMessage() {}
func (x *ReplicationMessage_Response) ProtoReflect() protoreflect.Message { func (x *ReplicationMessage_Response) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[3] mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -398,7 +447,7 @@ type ReplicationMessage_Terminated struct {
func (x *ReplicationMessage_Terminated) Reset() { func (x *ReplicationMessage_Terminated) Reset() {
*x = ReplicationMessage_Terminated{} *x = ReplicationMessage_Terminated{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4] mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@ -411,7 +460,7 @@ func (x *ReplicationMessage_Terminated) String() string {
func (*ReplicationMessage_Terminated) ProtoMessage() {} func (*ReplicationMessage_Terminated) ProtoMessage() {}
func (x *ReplicationMessage_Terminated) ProtoReflect() protoreflect.Message { func (x *ReplicationMessage_Terminated) ProtoReflect() protoreflect.Message {
mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[4] mi := &file_proto_pbpeerstream_peerstream_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@ -436,92 +485,99 @@ var file_proto_pbpeerstream_peerstream_proto_rawDesc = []byte{
0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x1a, 0x19, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62,
0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x6f, 0x74, 0x6f, 0x22, 0xe5, 0x05, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x74, 0x6f, 0x1a, 0x1b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x74, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x5c, 0x0a, 0x07, 0x72, 0x65, 0x75, 0x73, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22,
0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x68, 0x61, 0xe5, 0x05, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d,
0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x5c, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63,
0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72,
0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52,
0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x5f, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x6f, 0x6e, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x68, 0x61, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x65, 0x71,
0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x75, 0x65, 0x73, 0x74, 0x12, 0x5f, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f,
0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52,
0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0a, 0x74, 0x65, 0x72,
0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, 0x2e,
0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c,
0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74,
0x65, 0x64, 0x48, 0x00, 0x52, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64,
0x1a, 0xa9, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06,
0x50, 0x65, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65,
0x65, 0x72, 0x49, 0x44, 0x12, 0x24, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65,
0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x3e, 0x0a, 0x05,
0x45, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, 0x61,
0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69,
0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53,
0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xe3, 0x01, 0x0a,
0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e,
0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12,
0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52,
0x4c, 0x12, 0x1e, 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49,
0x44, 0x12, 0x30, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75,
0x72, 0x63, 0x65, 0x12, 0x4d, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f,
0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e,
0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4f, 0x70, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65,
0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x6f, 0x6e, 0x1a, 0x0c, 0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73,
0x42, 0x09, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x4c, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61,
0x65, 0x61, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, 0x2e, 0x68, 0x61, 0x73, 0x68,
0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a, 0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73,
0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x61, 0x67, 0x65, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x48, 0x00,
0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x0a, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x1a, 0xa9, 0x01, 0x0a,
0x52, 0x54, 0x10, 0x01, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x65, 0x65, 0x72,
0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x02, 0x32, 0x9f, 0x01, 0x0a, 0x11, 0x50, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x65, 0x65, 0x72, 0x49, 0x44,
0x65, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4e, 0x6f, 0x6e, 0x63,
0x12, 0x89, 0x01, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x72, 0x63, 0x65, 0x73, 0x12, 0x38, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x65, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72,
0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x52, 0x65, 0x73,
0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x3e, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f,
0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63,
0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72,
0x6e, 0x61, 0x6c, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75,
0x73, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xe3, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52,
0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0b, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x55, 0x52, 0x4c, 0x12, 0x1e, 0x0a,
0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0a, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x44, 0x12, 0x30, 0x0a,
0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12,
0x4d, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01,
0x28, 0x0e, 0x32, 0x2f, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63,
0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70,
0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x0c,
0x0a, 0x0a, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x42, 0x09, 0x0a, 0x07,
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x64, 0x65,
0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72,
0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65,
0x73, 0x73, 0x22, 0x5c, 0x0a, 0x0f, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x53, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x05, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x01,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65,
0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x38, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x05, 0x4e, 0x6f, 0x64, 0x65, 0x73,
0x2a, 0x52, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x19, 0x0a,
0x15, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45,
0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52,
0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x12, 0x14,
0x0a, 0x10, 0x4f, 0x50, 0x45, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x44, 0x45, 0x4c, 0x45,
0x54, 0x45, 0x10, 0x02, 0x32, 0x9f, 0x01, 0x0a, 0x11, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, 0x0a, 0x0f, 0x53,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x38,
0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75,
0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x9f, 0x02, 0x0a, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x38, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69,
0x28, 0x63, 0x6f, 0x6d, 0x2e, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65,
0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e,
0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x9f, 0x02, 0x0a, 0x28, 0x63, 0x6f, 0x6d, 0x2e, 0x68,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e,
0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72,
0x70, 0x62, 0x70, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xa2, 0x02, 0x04, 0x48, 0x65, 0x61, 0x6d, 0x42, 0x0f, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50,
0x43, 0x49, 0x50, 0xaa, 0x02, 0x24, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e,
0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xca, 0x02, 0x24, 0x48, 0x61, 0x73, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x70, 0x65, 0x65, 0x72,
0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xa2, 0x02, 0x04, 0x48, 0x43, 0x49, 0x50, 0xaa, 0x02, 0x24,
0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c,
0x6d, 0xe2, 0x02, 0x30, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74,
0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, 0x72, 0x65, 0x61, 0x6d, 0xca, 0x02, 0x24, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70,
0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x27, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0xe2, 0x02, 0x30, 0x48, 0x61,
0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x5c, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x5c, 0x49,
0x61, 0x6c, 0x3a, 0x3a, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5c, 0x50, 0x65, 0x65, 0x72, 0x73, 0x74, 0x72, 0x65,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x61, 0x6d, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02,
0x27, 0x48, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x3a, 0x3a, 0x43, 0x6f, 0x6e, 0x73,
0x75, 0x6c, 0x3a, 0x3a, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x3a, 0x3a, 0x50, 0x65,
0x65, 0x72, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -537,31 +593,34 @@ func file_proto_pbpeerstream_peerstream_proto_rawDescGZIP() []byte {
} }
var file_proto_pbpeerstream_peerstream_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_proto_pbpeerstream_peerstream_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_proto_pbpeerstream_peerstream_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_proto_pbpeerstream_peerstream_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_proto_pbpeerstream_peerstream_proto_goTypes = []interface{}{ var file_proto_pbpeerstream_peerstream_proto_goTypes = []interface{}{
(Operation)(0), // 0: hashicorp.consul.internal.peerstream.Operation (Operation)(0), // 0: hashicorp.consul.internal.peerstream.Operation
(*ReplicationMessage)(nil), // 1: hashicorp.consul.internal.peerstream.ReplicationMessage (*ReplicationMessage)(nil), // 1: hashicorp.consul.internal.peerstream.ReplicationMessage
(*LeaderAddress)(nil), // 2: hashicorp.consul.internal.peerstream.LeaderAddress (*LeaderAddress)(nil), // 2: hashicorp.consul.internal.peerstream.LeaderAddress
(*ReplicationMessage_Request)(nil), // 3: hashicorp.consul.internal.peerstream.ReplicationMessage.Request (*ExportedService)(nil), // 3: hashicorp.consul.internal.peerstream.ExportedService
(*ReplicationMessage_Response)(nil), // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Response (*ReplicationMessage_Request)(nil), // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Request
(*ReplicationMessage_Terminated)(nil), // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated (*ReplicationMessage_Response)(nil), // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response
(*pbstatus.Status)(nil), // 6: hashicorp.consul.internal.status.Status (*ReplicationMessage_Terminated)(nil), // 6: hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated
(*anypb.Any)(nil), // 7: google.protobuf.Any (*pbservice.CheckServiceNode)(nil), // 7: hashicorp.consul.internal.service.CheckServiceNode
(*pbstatus.Status)(nil), // 8: hashicorp.consul.internal.status.Status
(*anypb.Any)(nil), // 9: google.protobuf.Any
} }
var file_proto_pbpeerstream_peerstream_proto_depIdxs = []int32{ var file_proto_pbpeerstream_peerstream_proto_depIdxs = []int32{
3, // 0: hashicorp.consul.internal.peerstream.ReplicationMessage.request:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Request 4, // 0: hashicorp.consul.internal.peerstream.ReplicationMessage.request:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Request
4, // 1: hashicorp.consul.internal.peerstream.ReplicationMessage.response:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Response 5, // 1: hashicorp.consul.internal.peerstream.ReplicationMessage.response:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Response
5, // 2: hashicorp.consul.internal.peerstream.ReplicationMessage.terminated:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated 6, // 2: hashicorp.consul.internal.peerstream.ReplicationMessage.terminated:type_name -> hashicorp.consul.internal.peerstream.ReplicationMessage.Terminated
6, // 3: hashicorp.consul.internal.peerstream.ReplicationMessage.Request.Error:type_name -> hashicorp.consul.internal.status.Status 7, // 3: hashicorp.consul.internal.peerstream.ExportedService.Nodes:type_name -> hashicorp.consul.internal.service.CheckServiceNode
7, // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any 8, // 4: hashicorp.consul.internal.peerstream.ReplicationMessage.Request.Error:type_name -> hashicorp.consul.internal.status.Status
0, // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.operation:type_name -> hashicorp.consul.internal.peerstream.Operation 9, // 5: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.Resource:type_name -> google.protobuf.Any
1, // 6: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:input_type -> hashicorp.consul.internal.peerstream.ReplicationMessage 0, // 6: hashicorp.consul.internal.peerstream.ReplicationMessage.Response.operation:type_name -> hashicorp.consul.internal.peerstream.Operation
1, // 7: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:output_type -> hashicorp.consul.internal.peerstream.ReplicationMessage 1, // 7: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:input_type -> hashicorp.consul.internal.peerstream.ReplicationMessage
7, // [7:8] is the sub-list for method output_type 1, // 8: hashicorp.consul.internal.peerstream.PeerStreamService.StreamResources:output_type -> hashicorp.consul.internal.peerstream.ReplicationMessage
6, // [6:7] is the sub-list for method input_type 8, // [8:9] is the sub-list for method output_type
6, // [6:6] is the sub-list for extension type_name 7, // [7:8] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension extendee 7, // [7:7] is the sub-list for extension type_name
0, // [0:6] is the sub-list for field type_name 7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
} }
func init() { file_proto_pbpeerstream_peerstream_proto_init() } func init() { file_proto_pbpeerstream_peerstream_proto_init() }
@ -595,7 +654,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
} }
} }
file_proto_pbpeerstream_peerstream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { file_proto_pbpeerstream_peerstream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Request); i { switch v := v.(*ExportedService); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -607,7 +666,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
} }
} }
file_proto_pbpeerstream_peerstream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { file_proto_pbpeerstream_peerstream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Response); i { switch v := v.(*ReplicationMessage_Request); i {
case 0: case 0:
return &v.state return &v.state
case 1: case 1:
@ -619,6 +678,18 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
} }
} }
file_proto_pbpeerstream_peerstream_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { file_proto_pbpeerstream_peerstream_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Response); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_pbpeerstream_peerstream_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReplicationMessage_Terminated); i { switch v := v.(*ReplicationMessage_Terminated); i {
case 0: case 0:
return &v.state return &v.state
@ -642,7 +713,7 @@ func file_proto_pbpeerstream_peerstream_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_pbpeerstream_peerstream_proto_rawDesc, RawDescriptor: file_proto_pbpeerstream_peerstream_proto_rawDesc,
NumEnums: 1, NumEnums: 1,
NumMessages: 5, NumMessages: 6,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },

View File

@ -3,6 +3,7 @@ syntax = "proto3";
package hashicorp.consul.internal.peerstream; package hashicorp.consul.internal.peerstream;
import "google/protobuf/any.proto"; import "google/protobuf/any.proto";
import "proto/pbservice/node.proto";
// TODO(peering): Handle this some other way // TODO(peering): Handle this some other way
import "proto/pbstatus/status.proto"; import "proto/pbstatus/status.proto";
@ -89,3 +90,8 @@ message LeaderAddress {
// address is an ip:port best effort hint at what could be the cluster leader's address // address is an ip:port best effort hint at what could be the cluster leader's address
string address = 1; string address = 1;
} }
// ExportedService is one of the types of data returned via peer stream replication.
message ExportedService {
repeated hashicorp.consul.internal.service.CheckServiceNode Nodes = 1;
}

View File

@ -1,10 +1,16 @@
package pbpeerstream package pbpeerstream
const ( const (
TypeURLService = "type.googleapis.com/consul.api.Service" apiTypePrefix = "type.googleapis.com/"
TypeURLRoots = "type.googleapis.com/consul.api.CARoots"
TypeURLExportedService = apiTypePrefix + "hashicorp.consul.internal.peerstream.ExportedService"
TypeURLPeeringTrustBundle = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringTrustBundle"
) )
func KnownTypeURL(s string) bool { func KnownTypeURL(s string) bool {
return s == TypeURLService || s == TypeURLRoots switch s {
case TypeURLExportedService, TypeURLPeeringTrustBundle:
return true
}
return false
} }

View File

@ -1,8 +1,6 @@
package pbservice package pbservice
import ( import (
"fmt"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
@ -44,23 +42,6 @@ func NewMapHeadersFromStructs(t map[string][]string) map[string]*HeaderValue {
return s return s
} }
// CheckServiceNodesToStruct converts the contained CheckServiceNodes to their structs equivalent.
func (s *IndexedCheckServiceNodes) CheckServiceNodesToStruct() ([]structs.CheckServiceNode, error) {
if s == nil {
return nil, nil
}
resp := make([]structs.CheckServiceNode, 0, len(s.Nodes))
for _, pb := range s.Nodes {
instance, err := CheckServiceNodeToStructs(pb)
if err != nil {
return resp, fmt.Errorf("failed to convert instance: %w", err)
}
resp = append(resp, *instance)
}
return resp, nil
}
// TODO: use mog once it supports pointers and slices // TODO: use mog once it supports pointers and slices
func CheckServiceNodeToStructs(s *CheckServiceNode) (*structs.CheckServiceNode, error) { func CheckServiceNodeToStructs(s *CheckServiceNode) (*structs.CheckServiceNode, error) {
if s == nil { if s == nil {

View File

@ -16,3 +16,60 @@ func AssertDeepEqual(t testing.TB, x, y interface{}, opts ...cmp.Option) {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
} }
} }
// AssertElementsMatch asserts that the specified listX(array, slice...) is
// equal to specified listY(array, slice...) ignoring the order of the
// elements. If there are duplicate elements, the number of appearances of each
// of them in both lists should match.
//
// prototest.AssertElementsMatch(t, [1, 3, 2, 3], [1, 3, 3, 2])
func AssertElementsMatch[V any](
t testing.TB, listX, listY []V, opts ...cmp.Option,
) {
t.Helper()
if len(listX) == 0 && len(listY) == 0 {
return
}
opts = append(opts, protocmp.Transform())
// dump into a map keyed by sliceID
mapX := make(map[int]V)
for i, val := range listX {
mapX[i] = val
}
mapY := make(map[int]V)
for i, val := range listY {
mapY[i] = val
}
var outX, outY []V
for i, itemX := range mapX {
for j, itemY := range mapY {
if diff := cmp.Diff(itemX, itemY, opts...); diff == "" {
outX = append(outX, itemX)
outY = append(outY, itemY)
delete(mapX, i)
delete(mapY, j)
}
}
}
if len(outX) == len(outY) && len(outX) == len(listX) {
return // matches
}
// dump remainder into the slice so we can generate a useful error
for _, itemX := range mapX {
outX = append(outX, itemX)
}
for _, itemY := range mapY {
outY = append(outY, itemY)
}
if diff := cmp.Diff(outX, outY, opts...); diff != "" {
t.Fatalf("assertion failed: slices do not have matching elements\n--- expected\n+++ actual\n%v", diff)
}
}