mirror of
https://github.com/status-im/consul.git
synced 2025-02-17 16:16:44 +00:00
Require use of mesh gateways in order for service mesh data plane traffic to flow between peers. This also adds plumbing for envoy integration tests involving peers, and one starter peering test.
810 lines
27 KiB
Go
810 lines
27 KiB
Go
package peering
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/golang/protobuf/jsonpb"
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
"google.golang.org/genproto/googleapis/rpc/code"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
grpcstatus "google.golang.org/grpc/status"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
"github.com/hashicorp/consul/agent/dns"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
)
|
|
|
|
var (
|
|
errPeeringTokenInvalidCA = errors.New("peering token CA value is invalid")
|
|
errPeeringTokenEmptyServerAddresses = errors.New("peering token server addresses value is empty")
|
|
errPeeringTokenEmptyServerName = errors.New("peering token server name value is empty")
|
|
errPeeringTokenEmptyPeerID = errors.New("peering token peer ID value is empty")
|
|
)
|
|
|
|
// errPeeringInvalidServerAddress is returned when an initiate request contains
|
|
// an invalid server address.
|
|
type errPeeringInvalidServerAddress struct {
|
|
addr string
|
|
}
|
|
|
|
// Error implements the error interface
|
|
func (e *errPeeringInvalidServerAddress) Error() string {
|
|
return fmt.Sprintf("%s is not a valid peering server address", e.addr)
|
|
}
|
|
|
|
type Config struct {
|
|
Datacenter string
|
|
ConnectEnabled bool
|
|
}
|
|
|
|
// Service implements pbpeering.PeeringService to provide RPC operations for
|
|
// managing peering relationships.
|
|
type Service struct {
|
|
Backend Backend
|
|
logger hclog.Logger
|
|
config Config
|
|
streams *streamTracker
|
|
}
|
|
|
|
func NewService(logger hclog.Logger, cfg Config, backend Backend) *Service {
|
|
return &Service{
|
|
Backend: backend,
|
|
logger: logger,
|
|
config: cfg,
|
|
streams: newStreamTracker(),
|
|
}
|
|
}
|
|
|
|
var _ pbpeering.PeeringServiceServer = (*Service)(nil)
|
|
|
|
// Backend defines the core integrations the Peering endpoint depends on. A
|
|
// functional implementation will integrate with various subcomponents of Consul
|
|
// such as the State store for reading and writing data, the CA machinery for
|
|
// providing access to CA data and the RPC system for forwarding requests to
|
|
// other servers.
|
|
type Backend interface {
|
|
// Forward should forward the request to the leader when necessary.
|
|
Forward(info structs.RPCInfo, f func(*grpc.ClientConn) error) (handled bool, err error)
|
|
|
|
// GetAgentCACertificates returns the CA certificate to be returned in the peering token data
|
|
GetAgentCACertificates() ([]string, error)
|
|
|
|
// GetServerAddresses returns the addresses used for establishing a peering connection
|
|
GetServerAddresses() ([]string, error)
|
|
|
|
// GetServerName returns the SNI to be returned in the peering token data which
|
|
// will be used by peers when establishing peering connections over TLS.
|
|
GetServerName() string
|
|
|
|
// EncodeToken packages a peering token into a slice of bytes.
|
|
EncodeToken(tok *structs.PeeringToken) ([]byte, error)
|
|
|
|
// DecodeToken unpackages a peering token from a slice of bytes.
|
|
DecodeToken([]byte) (*structs.PeeringToken, error)
|
|
|
|
EnterpriseCheckPartitions(partition string) error
|
|
|
|
EnterpriseCheckNamespaces(namespace string) error
|
|
|
|
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
|
|
|
// IsLeader indicates whether the consul server is in a leader state or not.
|
|
IsLeader() bool
|
|
|
|
Store() Store
|
|
Apply() Apply
|
|
LeaderAddress() LeaderAddress
|
|
}
|
|
|
|
// LeaderAddress provides a way for the consul server to update the peering service about
|
|
// the server's leadership status.
|
|
// Server addresses should look like: ip:port
|
|
type LeaderAddress interface {
|
|
// Set is called on a raft.LeaderObservation in a go routine in the consul server;
|
|
// see trackLeaderChanges()
|
|
Set(leaderAddr string)
|
|
|
|
// Get provides the best hint for the current address of the leader.
|
|
// There is no guarantee that this is the actual address of the leader.
|
|
Get() string
|
|
}
|
|
|
|
// Store provides a read-only interface for querying Peering data.
|
|
type Store interface {
|
|
PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
|
|
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
|
|
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
|
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
|
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, *structs.ExportedServiceList, error)
|
|
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
|
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
|
|
TrustBundleListByService(ws memdb.WatchSet, service string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
|
AbandonCh() <-chan struct{}
|
|
}
|
|
|
|
// Apply provides a write-only interface for persisting Peering data.
|
|
type Apply interface {
|
|
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
|
|
PeeringDelete(req *pbpeering.PeeringDeleteRequest) error
|
|
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
|
|
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
|
CatalogRegister(req *structs.RegisterRequest) error
|
|
}
|
|
|
|
// GenerateToken implements the PeeringService RPC method to generate a
|
|
// peering token which is the initial step in establishing a peering relationship
|
|
// with other Consul clusters.
|
|
func (s *Service) GenerateToken(
|
|
ctx context.Context,
|
|
req *pbpeering.GenerateTokenRequest,
|
|
) (*pbpeering.GenerateTokenResponse, error) {
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
// validate prior to forwarding to the leader, this saves a network hop
|
|
if err := dns.ValidateLabel(req.PeerName); err != nil {
|
|
return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err)
|
|
}
|
|
|
|
if err := structs.ValidateMetaTags(req.Meta); err != nil {
|
|
return nil, fmt.Errorf("meta tags failed validation: %w", err)
|
|
}
|
|
|
|
// TODO(peering): add metrics
|
|
// TODO(peering): add tracing
|
|
|
|
resp := &pbpeering.GenerateTokenResponse{}
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).GenerateToken(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
ca, err := s.Backend.GetAgentCACertificates()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
serverAddrs, err := s.Backend.GetServerAddresses()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
writeReq := pbpeering.PeeringWriteRequest{
|
|
Peering: &pbpeering.Peering{
|
|
Name: req.PeerName,
|
|
// TODO(peering): Normalize from ACL token once this endpoint is guarded by ACLs.
|
|
Partition: req.PartitionOrDefault(),
|
|
Meta: req.Meta,
|
|
},
|
|
}
|
|
if err := s.Backend.Apply().PeeringWrite(&writeReq); err != nil {
|
|
return nil, fmt.Errorf("failed to write peering: %w", err)
|
|
}
|
|
|
|
q := state.Query{
|
|
Value: strings.ToLower(req.PeerName),
|
|
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(req.Partition),
|
|
}
|
|
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if peering == nil {
|
|
return nil, fmt.Errorf("peering was deleted while token generation request was in flight")
|
|
}
|
|
|
|
tok := structs.PeeringToken{
|
|
// Store the UUID so that we can do a global search when handling inbound streams.
|
|
PeerID: peering.ID,
|
|
CA: ca,
|
|
ServerAddresses: serverAddrs,
|
|
ServerName: s.Backend.GetServerName(),
|
|
}
|
|
|
|
encoded, err := s.Backend.EncodeToken(&tok)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp.PeeringToken = string(encoded)
|
|
return resp, err
|
|
}
|
|
|
|
// Initiate implements the PeeringService RPC method to finalize peering
|
|
// registration. Given a valid token output from a peer's GenerateToken endpoint,
|
|
// a peering is registered.
|
|
func (s *Service) Initiate(
|
|
ctx context.Context,
|
|
req *pbpeering.InitiateRequest,
|
|
) (*pbpeering.InitiateResponse, error) {
|
|
// validate prior to forwarding to the leader, this saves a network hop
|
|
if err := dns.ValidateLabel(req.PeerName); err != nil {
|
|
return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err)
|
|
}
|
|
tok, err := s.Backend.DecodeToken([]byte(req.PeeringToken))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := validatePeeringToken(tok); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := structs.ValidateMetaTags(req.Meta); err != nil {
|
|
return nil, fmt.Errorf("meta tags failed validation: %w", err)
|
|
}
|
|
|
|
resp := &pbpeering.InitiateResponse{}
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).Initiate(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "initiate"}, time.Now())
|
|
|
|
// convert ServiceAddress values to strings
|
|
serverAddrs := make([]string, len(tok.ServerAddresses))
|
|
for i, addr := range tok.ServerAddresses {
|
|
serverAddrs[i] = addr
|
|
}
|
|
|
|
// as soon as a peering is written with a list of ServerAddresses that is
|
|
// non-empty, the leader routine will see the peering and attempt to
|
|
// establish a connection with the remote peer.
|
|
//
|
|
// This peer now has a record of both the LocalPeerID(ID) and
|
|
// RemotePeerID(PeerID) but at this point the other peer does not.
|
|
writeReq := &pbpeering.PeeringWriteRequest{
|
|
Peering: &pbpeering.Peering{
|
|
Name: req.PeerName,
|
|
PeerCAPems: tok.CA,
|
|
PeerServerAddresses: serverAddrs,
|
|
PeerServerName: tok.ServerName,
|
|
PeerID: tok.PeerID,
|
|
Meta: req.Meta,
|
|
},
|
|
}
|
|
if err = s.Backend.Apply().PeeringWrite(writeReq); err != nil {
|
|
return nil, fmt.Errorf("failed to write peering: %w", err)
|
|
}
|
|
// resp.Status == 0
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *Service) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error) {
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringReadResponse
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringRead(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "read"}, time.Now())
|
|
// TODO(peering): ACL check request token
|
|
|
|
// TODO(peering): handle blocking queries
|
|
q := state.Query{
|
|
Value: strings.ToLower(req.Name),
|
|
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(req.Partition)}
|
|
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.PeeringReadResponse{Peering: peering}, nil
|
|
}
|
|
|
|
func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) {
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringListResponse
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringList(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "list"}, time.Now())
|
|
// TODO(peering): ACL check request token
|
|
|
|
// TODO(peering): handle blocking queries
|
|
_, peerings, err := s.Backend.Store().PeeringList(nil, *structs.NodeEnterpriseMetaInPartition(req.Partition))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.PeeringListResponse{Peerings: peerings}, nil
|
|
}
|
|
|
|
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
|
|
// Consider removing if we can find another way to populate state store in peering_endpoint_test.go
|
|
func (s *Service) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error) {
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Peering.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringWriteResponse
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringWrite(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "write"}, time.Now())
|
|
// TODO(peering): ACL check request token
|
|
|
|
// TODO(peering): handle blocking queries
|
|
err = s.Backend.Apply().PeeringWrite(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.PeeringWriteResponse{}, nil
|
|
}
|
|
|
|
func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error) {
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringDeleteResponse
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringDelete(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "delete"}, time.Now())
|
|
// TODO(peering): ACL check request token
|
|
|
|
// TODO(peering): handle blocking queries
|
|
err = s.Backend.Apply().PeeringDelete(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.PeeringDeleteResponse{}, nil
|
|
}
|
|
|
|
func (s *Service) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error) {
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.TrustBundleReadResponse
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).TrustBundleRead(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "trust_bundle_read"}, time.Now())
|
|
// TODO(peering): ACL check request token
|
|
|
|
// TODO(peering): handle blocking queries
|
|
|
|
idx, trustBundle, err := s.Backend.Store().PeeringTrustBundleRead(nil, state.Query{
|
|
Value: req.Name,
|
|
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(req.Partition),
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read trust bundle for peer %s: %w", req.Name, err)
|
|
}
|
|
|
|
return &pbpeering.TrustBundleReadResponse{
|
|
Index: idx,
|
|
Bundle: trustBundle,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) {
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
if err := s.Backend.EnterpriseCheckNamespaces(req.Namespace); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.TrustBundleListByServiceResponse
|
|
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).TrustBundleListByService(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "trust_bundle_list_by_service"}, time.Now())
|
|
// TODO(peering): ACL check request token for service:write on the service name
|
|
|
|
// TODO(peering): handle blocking queries
|
|
|
|
entMeta := acl.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
|
|
idx, bundles, err := s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, entMeta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.TrustBundleListByServiceResponse{Index: idx, Bundles: bundles}, nil
|
|
}
|
|
|
|
type BidirectionalStream interface {
|
|
Send(*pbpeering.ReplicationMessage) error
|
|
Recv() (*pbpeering.ReplicationMessage, error)
|
|
Context() context.Context
|
|
}
|
|
|
|
// StreamResources handles incoming streaming connections.
|
|
func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResourcesServer) error {
|
|
if !s.Backend.IsLeader() {
|
|
// we are not the leader so we will hang up on the dialer
|
|
|
|
s.logger.Error("cannot establish a peering stream on a follower node")
|
|
|
|
st, err := grpcstatus.New(codes.FailedPrecondition,
|
|
"cannot establish a peering stream on a follower node").WithDetails(
|
|
&pbpeering.LeaderAddress{Address: s.Backend.LeaderAddress().Get()})
|
|
if err != nil {
|
|
s.logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
|
|
return grpcstatus.Error(codes.FailedPrecondition, "cannot establish a peering stream on a follower node")
|
|
} else {
|
|
return st.Err()
|
|
}
|
|
}
|
|
|
|
// Initial message on a new stream must be a new subscription request.
|
|
first, err := stream.Recv()
|
|
if err != nil {
|
|
s.logger.Error("failed to establish stream", "error", err)
|
|
return err
|
|
}
|
|
|
|
// TODO(peering) Make request contain a list of resources, so that roots and services can be
|
|
// subscribed to with a single request. See:
|
|
// https://github.com/envoyproxy/data-plane-api/blob/main/envoy/service/discovery/v3/discovery.proto#L46
|
|
req := first.GetRequest()
|
|
if req == nil {
|
|
return grpcstatus.Error(codes.InvalidArgument, "first message when initiating a peering must be a subscription request")
|
|
}
|
|
s.logger.Trace("received initial replication request from peer")
|
|
logTraceRecv(s.logger, req)
|
|
|
|
if req.PeerID == "" {
|
|
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must specify a PeerID")
|
|
}
|
|
if req.Nonce != "" {
|
|
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request must not contain a nonce")
|
|
}
|
|
if !pbpeering.KnownTypeURL(req.ResourceURL) {
|
|
return grpcstatus.Error(codes.InvalidArgument, fmt.Sprintf("subscription request to unknown resource URL: %s", req.ResourceURL))
|
|
}
|
|
|
|
_, p, err := s.Backend.Store().PeeringReadByID(nil, req.PeerID)
|
|
if err != nil {
|
|
s.logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err)
|
|
return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID)
|
|
}
|
|
if p == nil {
|
|
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
|
|
}
|
|
|
|
// TODO(peering): If the peering is marked as deleted, send a Terminated message and return
|
|
// TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it
|
|
s.logger.Info("accepted initial replication request from peer", "peer_id", req.PeerID)
|
|
|
|
// For server peers both of these ID values are the same, because we generated a token with a local ID,
|
|
// and the client peer dials using that same ID.
|
|
return s.HandleStream(HandleStreamRequest{
|
|
LocalID: p.ID,
|
|
RemoteID: p.PeerID,
|
|
PeerName: p.Name,
|
|
Partition: p.Partition,
|
|
Stream: stream,
|
|
})
|
|
}
|
|
|
|
type HandleStreamRequest struct {
|
|
// LocalID is the UUID for the peering in the local Consul datacenter.
|
|
LocalID string
|
|
|
|
// RemoteID is the UUID for the peering from the perspective of the peer.
|
|
RemoteID string
|
|
|
|
// PeerName is the name of the peering.
|
|
PeerName string
|
|
|
|
// Partition is the local partition associated with the peer.
|
|
Partition string
|
|
|
|
// Stream is the open stream to the peer cluster.
|
|
Stream BidirectionalStream
|
|
}
|
|
|
|
// The localID provided is the locally-generated identifier for the peering.
|
|
// The remoteID is an identifier that the remote peer recognizes for the peering.
|
|
func (s *Service) HandleStream(req HandleStreamRequest) error {
|
|
logger := s.logger.Named("stream").With("peer_id", req.LocalID)
|
|
logger.Trace("handling stream for peer")
|
|
|
|
status, err := s.streams.connected(req.LocalID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register stream: %v", err)
|
|
}
|
|
|
|
// TODO(peering) Also need to clear subscriptions associated with the peer
|
|
defer s.streams.disconnected(req.LocalID)
|
|
|
|
var trustDomain string
|
|
if s.config.ConnectEnabled {
|
|
// Read the TrustDomain up front - we do not allow users to change the ClusterID
|
|
// so reading it once at the beginning of the stream is sufficient.
|
|
trustDomain, err = getTrustDomain(s.Backend.Store(), logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
mgr := newSubscriptionManager(
|
|
req.Stream.Context(),
|
|
logger,
|
|
s.config,
|
|
trustDomain,
|
|
s.Backend,
|
|
)
|
|
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.PeerName, req.Partition)
|
|
|
|
sub := &pbpeering.ReplicationMessage{
|
|
Payload: &pbpeering.ReplicationMessage_Request_{
|
|
Request: &pbpeering.ReplicationMessage_Request{
|
|
ResourceURL: pbpeering.TypeURLService,
|
|
PeerID: req.RemoteID,
|
|
},
|
|
},
|
|
}
|
|
logTraceSend(logger, sub)
|
|
|
|
if err := req.Stream.Send(sub); err != nil {
|
|
if err == io.EOF {
|
|
logger.Info("stream ended by peer")
|
|
status.trackReceiveError(err.Error())
|
|
return nil
|
|
}
|
|
// TODO(peering) Test error handling in calls to Send/Recv
|
|
status.trackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
|
|
// TODO(peering): Should this be buffered?
|
|
recvChan := make(chan *pbpeering.ReplicationMessage)
|
|
go func() {
|
|
defer close(recvChan)
|
|
for {
|
|
msg, err := req.Stream.Recv()
|
|
if err == io.EOF {
|
|
logger.Info("stream ended by peer")
|
|
status.trackReceiveError(err.Error())
|
|
return
|
|
}
|
|
if e, ok := grpcstatus.FromError(err); ok {
|
|
// Cancelling the stream is not an error, that means we or our peer intended to terminate the peering.
|
|
if e.Code() == codes.Canceled {
|
|
return
|
|
}
|
|
}
|
|
if err != nil {
|
|
logger.Error("failed to receive from stream", "error", err)
|
|
status.trackReceiveError(err.Error())
|
|
return
|
|
}
|
|
|
|
logTraceRecv(logger, msg)
|
|
recvChan <- msg
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
// When the doneCh is closed that means that the peering was deleted locally.
|
|
case <-status.doneCh:
|
|
logger.Info("ending stream")
|
|
|
|
term := &pbpeering.ReplicationMessage{
|
|
Payload: &pbpeering.ReplicationMessage_Terminated_{
|
|
Terminated: &pbpeering.ReplicationMessage_Terminated{},
|
|
},
|
|
}
|
|
logTraceSend(logger, term)
|
|
|
|
if err := req.Stream.Send(term); err != nil {
|
|
status.trackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
|
|
logger.Trace("deleting stream status")
|
|
s.streams.deleteStatus(req.LocalID)
|
|
|
|
return nil
|
|
|
|
case msg, open := <-recvChan:
|
|
if !open {
|
|
// No longer receiving data on the stream.
|
|
return nil
|
|
}
|
|
|
|
if !s.Backend.IsLeader() {
|
|
// we are not the leader anymore so we will hang up on the dialer
|
|
|
|
logger.Error("node is not a leader anymore; cannot continue streaming")
|
|
|
|
st, err := grpcstatus.New(codes.FailedPrecondition,
|
|
"node is not a leader anymore; cannot continue streaming").WithDetails(
|
|
&pbpeering.LeaderAddress{Address: s.Backend.LeaderAddress().Get()})
|
|
if err != nil {
|
|
s.logger.Error(fmt.Sprintf("failed to marshal the leader address in response; err: %v", err))
|
|
return grpcstatus.Error(codes.FailedPrecondition, "node is not a leader anymore; cannot continue streaming")
|
|
} else {
|
|
return st.Err()
|
|
}
|
|
}
|
|
|
|
if req := msg.GetRequest(); req != nil {
|
|
switch {
|
|
case req.Nonce == "":
|
|
// TODO(peering): This can happen on a client peer since they don't try to receive subscriptions before entering HandleStream.
|
|
// Should change that behavior or only allow it that one time.
|
|
|
|
case req.Error != nil && (req.Error.Code != int32(code.Code_OK) || req.Error.Message != ""):
|
|
logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message)
|
|
status.trackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message))
|
|
|
|
default:
|
|
status.trackAck()
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if resp := msg.GetResponse(); resp != nil {
|
|
// TODO(peering): Ensure there's a nonce
|
|
reply, err := s.processResponse(req.PeerName, req.Partition, resp)
|
|
if err != nil {
|
|
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
|
|
status.trackReceiveError(err.Error())
|
|
} else {
|
|
status.trackReceiveSuccess()
|
|
}
|
|
|
|
logTraceSend(logger, reply)
|
|
if err := req.Stream.Send(reply); err != nil {
|
|
status.trackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
if term := msg.GetTerminated(); term != nil {
|
|
logger.Info("received peering termination message, cleaning up imported resources")
|
|
|
|
// Once marked as terminated, a separate deferred deletion routine will clean up imported resources.
|
|
if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: req.LocalID}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
case update := <-subCh:
|
|
var resp *pbpeering.ReplicationMessage
|
|
switch {
|
|
case strings.HasPrefix(update.CorrelationID, subExportedService):
|
|
resp = makeServiceResponse(logger, update)
|
|
|
|
case strings.HasPrefix(update.CorrelationID, subMeshGateway):
|
|
// TODO(Peering): figure out how to sync this separately
|
|
|
|
case update.CorrelationID == subCARoot:
|
|
resp = makeCARootsResponse(logger, update)
|
|
|
|
default:
|
|
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
|
|
continue
|
|
}
|
|
if resp == nil {
|
|
continue
|
|
}
|
|
logTraceSend(logger, resp)
|
|
if err := req.Stream.Send(resp); err != nil {
|
|
status.trackSendError(err.Error())
|
|
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func getTrustDomain(store Store, logger hclog.Logger) (string, error) {
|
|
_, cfg, err := store.CAConfig(nil)
|
|
switch {
|
|
case err != nil:
|
|
logger.Error("failed to read Connect CA Config", "error", err)
|
|
return "", grpcstatus.Error(codes.Internal, "failed to read Connect CA Config")
|
|
case cfg == nil:
|
|
logger.Warn("cannot begin stream because Connect CA is not yet initialized")
|
|
return "", grpcstatus.Error(codes.FailedPrecondition, "Connect CA is not yet initialized")
|
|
}
|
|
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
|
|
}
|
|
|
|
func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) {
|
|
return s.streams.streamStatus(peer)
|
|
}
|
|
|
|
// ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.
|
|
func (s *Service) ConnectedStreams() map[string]chan struct{} {
|
|
return s.streams.connectedStreams()
|
|
}
|
|
|
|
func logTraceRecv(logger hclog.Logger, pb proto.Message) {
|
|
logTraceProto(logger, pb, true)
|
|
}
|
|
|
|
func logTraceSend(logger hclog.Logger, pb proto.Message) {
|
|
logTraceProto(logger, pb, false)
|
|
}
|
|
|
|
func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) {
|
|
if !logger.IsTrace() {
|
|
return
|
|
}
|
|
|
|
dir := "sent"
|
|
if received {
|
|
dir = "received"
|
|
}
|
|
|
|
m := jsonpb.Marshaler{
|
|
Indent: " ",
|
|
}
|
|
out, err := m.MarshalToString(pb)
|
|
if err != nil {
|
|
out = "<ERROR: " + err.Error() + ">"
|
|
}
|
|
|
|
logger.Trace("replication message", "direction", dir, "protobuf", out)
|
|
}
|