mirror of https://github.com/status-im/consul.git
976 lines
31 KiB
Go
976 lines
31 KiB
Go
package peering
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-multierror"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
grpcstatus "google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/acl/resolver"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
"github.com/hashicorp/consul/agent/dns"
|
|
external "github.com/hashicorp/consul/agent/grpc-external"
|
|
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/lib"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
"github.com/hashicorp/consul/proto/pbpeerstream"
|
|
)
|
|
|
|
var (
|
|
errPeeringTokenInvalidCA = errors.New("peering token CA value is invalid")
|
|
errPeeringTokenEmptyServerAddresses = errors.New("peering token server addresses value is empty")
|
|
errPeeringTokenEmptyServerName = errors.New("peering token server name value is empty")
|
|
errPeeringTokenEmptyPeerID = errors.New("peering token peer ID value is empty")
|
|
)
|
|
|
|
// errPeeringInvalidServerAddress is returned when an establish request contains
|
|
// an invalid server address.
|
|
type errPeeringInvalidServerAddress struct {
|
|
addr string
|
|
}
|
|
|
|
// Error implements the error interface
|
|
func (e *errPeeringInvalidServerAddress) Error() string {
|
|
return fmt.Sprintf("%s is not a valid peering server address", e.addr)
|
|
}
|
|
|
|
// For private/internal gRPC handlers, protoc-gen-rpc-glue generates the
|
|
// requisite methods to satisfy the structs.RPCInfo interface using fields
|
|
// from the pbcommon package. This service is public, so we can't use those
|
|
// fields in our proto definition. Instead, we construct our RPCInfo manually.
|
|
var writeRequest struct {
|
|
structs.WriteRequest
|
|
structs.DCSpecificRequest
|
|
}
|
|
|
|
var readRequest struct {
|
|
structs.QueryOptions
|
|
structs.DCSpecificRequest
|
|
}
|
|
|
|
// Server implements pbpeering.PeeringService to provide RPC operations for
|
|
// managing peering relationships.
|
|
type Server struct {
|
|
Config
|
|
}
|
|
|
|
type Config struct {
|
|
Backend Backend
|
|
Tracker *peerstream.Tracker
|
|
Logger hclog.Logger
|
|
ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
|
|
Datacenter string
|
|
ConnectEnabled bool
|
|
PeeringEnabled bool
|
|
}
|
|
|
|
func NewServer(cfg Config) *Server {
|
|
requireNotNil(cfg.Backend, "Backend")
|
|
requireNotNil(cfg.Tracker, "Tracker")
|
|
requireNotNil(cfg.Logger, "Logger")
|
|
requireNotNil(cfg.ForwardRPC, "ForwardRPC")
|
|
if cfg.Datacenter == "" {
|
|
panic("Datacenter is required")
|
|
}
|
|
return &Server{
|
|
Config: cfg,
|
|
}
|
|
}
|
|
|
|
func requireNotNil(v interface{}, name string) {
|
|
if v == nil {
|
|
panic(name + " is required")
|
|
}
|
|
}
|
|
|
|
var _ pbpeering.PeeringServiceServer = (*Server)(nil)
|
|
|
|
func (s *Server) Register(grpcServer *grpc.Server) {
|
|
pbpeering.RegisterPeeringServiceServer(grpcServer, s)
|
|
}
|
|
|
|
// Backend defines the core integrations the Peering endpoint depends on. A
|
|
// functional implementation will integrate with various subcomponents of Consul
|
|
// such as the State store for reading and writing data, the CA machinery for
|
|
// providing access to CA data and the RPC system for forwarding requests to
|
|
// other servers.
|
|
type Backend interface {
|
|
// ResolveTokenAndDefaultMeta returns an acl.Authorizer which authorizes
|
|
// actions based on the permissions granted to the token.
|
|
// If either entMeta or authzContext are non-nil they will be populated with the
|
|
// partition and namespace from the token.
|
|
ResolveTokenAndDefaultMeta(token string, entMeta *acl.EnterpriseMeta, authzCtx *acl.AuthorizerContext) (resolver.Result, error)
|
|
|
|
// GetTLSMaterials returns the TLS materials for the dialer to dial the acceptor using TLS.
|
|
// It returns the server name to validate, and the CA certificate to validate with.
|
|
GetTLSMaterials(generatingToken bool) (string, []string, error)
|
|
|
|
// GetServerAddresses returns the addresses used for establishing a peering connection.
|
|
// These may be server addresses or mesh gateway addresses if peering through mesh gateways.
|
|
GetServerAddresses() ([]string, error)
|
|
|
|
// EncodeToken packages a peering token into a slice of bytes.
|
|
EncodeToken(tok *structs.PeeringToken) ([]byte, error)
|
|
|
|
// DecodeToken unpackages a peering token from a slice of bytes.
|
|
DecodeToken([]byte) (*structs.PeeringToken, error)
|
|
|
|
EnterpriseCheckPartitions(partition string) error
|
|
|
|
EnterpriseCheckNamespaces(namespace string) error
|
|
|
|
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
|
|
|
|
// IsLeader indicates whether the consul server is in a leader state or not.
|
|
IsLeader() bool
|
|
|
|
// SetLeaderAddress is called on a raft.LeaderObservation in a go routine
|
|
// in the consul server; see trackLeaderChanges()
|
|
SetLeaderAddress(string)
|
|
|
|
// GetLeaderAddress provides the best hint for the current address of the
|
|
// leader. There is no guarantee that this is the actual address of the
|
|
// leader.
|
|
GetLeaderAddress() string
|
|
|
|
// CheckPeeringUUID returns true if the given UUID is not associated with
|
|
// an existing peering.
|
|
CheckPeeringUUID(id string) (bool, error)
|
|
|
|
ValidateProposedPeeringSecret(id string) (bool, error)
|
|
|
|
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
|
|
|
|
Store() Store
|
|
}
|
|
|
|
// Store provides a read-only interface for querying Peering data.
|
|
type Store interface {
|
|
PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
|
|
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
|
|
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
|
|
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
|
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
|
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
|
}
|
|
|
|
var peeringNotEnabledErr = grpcstatus.Error(codes.FailedPrecondition, "peering must be enabled to use this endpoint")
|
|
|
|
// GenerateToken implements the PeeringService RPC method to generate a
|
|
// peering token which is the initial step in establishing a peering relationship
|
|
// with other Consul clusters.
|
|
func (s *Server) GenerateToken(
|
|
ctx context.Context,
|
|
req *pbpeering.GenerateTokenRequest,
|
|
) (*pbpeering.GenerateTokenResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
// validate prior to forwarding to the leader, this saves a network hop
|
|
if err := dns.ValidateLabel(req.PeerName); err != nil {
|
|
return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err)
|
|
}
|
|
|
|
if err := structs.ValidateMetaTags(req.Meta); err != nil {
|
|
return nil, fmt.Errorf("meta tags failed validation: %w", err)
|
|
}
|
|
|
|
resp := &pbpeering.GenerateTokenResponse{}
|
|
handled, err := s.ForwardRPC(&writeRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).GenerateToken(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "generate_token"}, time.Now())
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition)
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().PeeringWriteAllowed(&authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
serverName, caPEMs, err := s.Backend.GetTLSMaterials(true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var (
|
|
peering *pbpeering.Peering
|
|
secretID string
|
|
)
|
|
|
|
// This loop ensures at most one retry in the case of a race condition.
|
|
for canRetry := true; canRetry; canRetry = false {
|
|
peering, err = s.getExistingPeering(req.PeerName, entMeta.PartitionOrDefault())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if peering == nil {
|
|
id, err := lib.GenerateUUID(s.Backend.CheckPeeringUUID)
|
|
if err != nil {
|
|
return resp, err
|
|
}
|
|
peering = &pbpeering.Peering{
|
|
ID: id,
|
|
Name: req.PeerName,
|
|
Meta: req.Meta,
|
|
|
|
// PartitionOrEmpty is used to avoid writing "default" in OSS.
|
|
Partition: entMeta.PartitionOrEmpty(),
|
|
}
|
|
} else {
|
|
// validate that this peer name is not being used as a dialer already
|
|
if err := validatePeer(peering, false); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// A new establishment secret is generated on every GenerateToken request.
|
|
// This allows for rotating secrets by generating a new token for a peering and then
|
|
// using the new token to re-establish the peering.
|
|
secretID, err = s.generateNewEstablishmentSecret()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to generate secret for peering establishment: %w", err)
|
|
}
|
|
|
|
writeReq := &pbpeering.PeeringWriteRequest{
|
|
Peering: peering,
|
|
SecretsRequest: &pbpeering.SecretsWriteRequest{
|
|
PeerID: peering.ID,
|
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
|
EstablishmentSecret: secretID,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
if err := s.Backend.PeeringWrite(writeReq); err != nil {
|
|
// There's a possible race where two servers call Generate Token at the
|
|
// same time with the same peer name for the first time. They both
|
|
// generate an ID and try to insert and only one wins. This detects the
|
|
// collision and forces the loser to discard its generated ID and use
|
|
// the one from the other server.
|
|
if strings.Contains(err.Error(), "A peering already exists with the name") {
|
|
// retry to fetch existing peering
|
|
continue
|
|
}
|
|
return nil, fmt.Errorf("failed to write peering: %w", err)
|
|
}
|
|
// write succeeded, break loop early
|
|
break
|
|
}
|
|
|
|
// ServerExternalAddresses must be formatted as addr:port.
|
|
var serverAddrs []string
|
|
if len(req.ServerExternalAddresses) > 0 {
|
|
serverAddrs = req.ServerExternalAddresses
|
|
} else {
|
|
serverAddrs, err = s.Backend.GetServerAddresses()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
tok := structs.PeeringToken{
|
|
// Store the UUID so that we can do a global search when handling inbound streams.
|
|
PeerID: peering.ID,
|
|
CA: caPEMs,
|
|
ServerAddresses: serverAddrs,
|
|
ServerName: serverName,
|
|
EstablishmentSecret: secretID,
|
|
Remote: structs.PeeringTokenRemote{
|
|
Partition: req.PartitionOrDefault(),
|
|
Datacenter: s.Datacenter,
|
|
},
|
|
}
|
|
|
|
encoded, err := s.Backend.EncodeToken(&tok)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp.PeeringToken = string(encoded)
|
|
return resp, err
|
|
}
|
|
|
|
// Establish implements the PeeringService RPC method to finalize peering
|
|
// registration. Given a valid token output from a peer's GenerateToken endpoint,
|
|
// a peering is registered.
|
|
func (s *Server) Establish(
|
|
ctx context.Context,
|
|
req *pbpeering.EstablishRequest,
|
|
) (*pbpeering.EstablishResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
// validate prior to forwarding to the leader, this saves a network hop
|
|
if err := dns.ValidateLabel(req.PeerName); err != nil {
|
|
return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err)
|
|
}
|
|
tok, err := s.Backend.DecodeToken([]byte(req.PeeringToken))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := validatePeeringToken(tok); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := structs.ValidateMetaTags(req.Meta); err != nil {
|
|
return nil, fmt.Errorf("meta tags failed validation: %w", err)
|
|
}
|
|
|
|
resp := &pbpeering.EstablishResponse{}
|
|
handled, err := s.ForwardRPC(&writeRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).Establish(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "establish"}, time.Now())
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition)
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().PeeringWriteAllowed(&authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
existing, err := s.getExistingPeering(req.PeerName, entMeta.PartitionOrDefault())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.validatePeeringLocality(tok); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var id string
|
|
serverAddrs := tok.ServerAddresses
|
|
if existing == nil {
|
|
id, err = lib.GenerateUUID(s.Backend.CheckPeeringUUID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
id = existing.ID
|
|
// If there is a connected stream, assume that the existing ServerAddresses
|
|
// are up to date and do not try to overwrite them with the token's addresses.
|
|
if status, ok := s.Tracker.StreamStatus(id); ok && status.Connected {
|
|
serverAddrs = existing.PeerServerAddresses
|
|
}
|
|
}
|
|
|
|
// validate that this peer name is not being used as an acceptor already
|
|
if err := validatePeer(existing, true); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
peering := &pbpeering.Peering{
|
|
ID: id,
|
|
Name: req.PeerName,
|
|
PeerCAPems: tok.CA,
|
|
PeerServerAddresses: serverAddrs,
|
|
PeerServerName: tok.ServerName,
|
|
PeerID: tok.PeerID,
|
|
Meta: req.Meta,
|
|
State: pbpeering.PeeringState_ESTABLISHING,
|
|
// PartitionOrEmpty is used to avoid writing "default" in OSS.
|
|
Partition: entMeta.PartitionOrEmpty(),
|
|
Remote: &pbpeering.RemoteInfo{
|
|
Partition: tok.Remote.Partition,
|
|
Datacenter: tok.Remote.Datacenter,
|
|
},
|
|
}
|
|
|
|
tlsOption, err := peering.TLSDialOption()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to build TLS dial option from peering: %w", err)
|
|
}
|
|
|
|
exchangeReq := pbpeerstream.ExchangeSecretRequest{
|
|
PeerID: peering.PeerID,
|
|
EstablishmentSecret: tok.EstablishmentSecret,
|
|
}
|
|
var exchangeResp *pbpeerstream.ExchangeSecretResponse
|
|
|
|
// Loop through the known server addresses once, attempting to fetch the long-lived stream secret.
|
|
var dialErrors error
|
|
for _, addr := range serverAddrs {
|
|
exchangeResp, err = exchangeSecret(ctx, addr, tlsOption, &exchangeReq)
|
|
if err != nil {
|
|
dialErrors = multierror.Append(dialErrors, fmt.Errorf("failed to exchange peering secret with %q: %w", addr, err))
|
|
}
|
|
if exchangeResp != nil {
|
|
break
|
|
}
|
|
}
|
|
if exchangeResp == nil {
|
|
return nil, dialErrors
|
|
}
|
|
|
|
// As soon as a peering is written with a non-empty list of ServerAddresses
|
|
// and an active stream secret, a leader routine will see the peering and
|
|
// attempt to establish a peering stream with the remote peer.
|
|
//
|
|
// This peer now has a record of both the LocalPeerID(ID) and
|
|
// RemotePeerID(PeerID) but at this point the other peer does not.
|
|
writeReq := &pbpeering.PeeringWriteRequest{
|
|
Peering: peering,
|
|
SecretsRequest: &pbpeering.SecretsWriteRequest{
|
|
PeerID: peering.ID,
|
|
Request: &pbpeering.SecretsWriteRequest_Establish{
|
|
Establish: &pbpeering.SecretsWriteRequest_EstablishRequest{
|
|
ActiveStreamSecret: exchangeResp.StreamSecret,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
if err := s.Backend.PeeringWrite(writeReq); err != nil {
|
|
return nil, fmt.Errorf("failed to write peering: %w", err)
|
|
}
|
|
// TODO(peering): low prio: consider adding response details
|
|
return resp, nil
|
|
}
|
|
|
|
// validatePeeringLocality makes sure that we don't create a peering in the same cluster it was generated.
|
|
// If the ServerName of the PeeringToken overlaps with our own, we do not accept it.
|
|
func (s *Server) validatePeeringLocality(token *structs.PeeringToken) error {
|
|
serverName, _, err := s.Backend.GetTLSMaterials(false)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch TLS materials: %w", err)
|
|
}
|
|
if serverName == token.ServerName {
|
|
return fmt.Errorf(
|
|
"cannot create a peering within the same cluster %q. Refer to the `exported-services` documentation if you want to export between partitions without peering",
|
|
serverName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func exchangeSecret(ctx context.Context, addr string, tlsOption grpc.DialOption, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error) {
|
|
dialCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
conn, err := grpc.DialContext(dialCtx, addr,
|
|
tlsOption,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to dial peer: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := pbpeerstream.NewPeerStreamServiceClient(conn)
|
|
return client.ExchangeSecret(ctx, req)
|
|
}
|
|
|
|
// OPTIMIZE: Handle blocking queries
|
|
func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringReadResponse
|
|
handled, err := s.ForwardRPC(&readRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringRead(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "read"}, time.Now())
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition)
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().PeeringReadAllowed(&authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
q := state.Query{
|
|
Value: strings.ToLower(req.Name),
|
|
EnterpriseMeta: *entMeta,
|
|
}
|
|
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if peering == nil {
|
|
return &pbpeering.PeeringReadResponse{Peering: nil}, nil
|
|
}
|
|
|
|
cp := s.reconcilePeering(peering)
|
|
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
|
|
}
|
|
|
|
// OPTIMIZE: Handle blocking queries
|
|
func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringListResponse
|
|
handled, err := s.ForwardRPC(&readRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringList(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition)
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().PeeringReadAllowed(&authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "list"}, time.Now())
|
|
|
|
idx, peerings, err := s.Backend.Store().PeeringList(nil, *entMeta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// reconcile the actual peering state; need to copy over the ds for peering
|
|
var cPeerings []*pbpeering.Peering
|
|
for _, p := range peerings {
|
|
cp := s.reconcilePeering(p)
|
|
cPeerings = append(cPeerings, cp)
|
|
}
|
|
|
|
return &pbpeering.PeeringListResponse{Peerings: cPeerings, Index: idx}, nil
|
|
}
|
|
|
|
// TODO(peering): Get rid of this func when we stop using the stream tracker for imported/ exported services and the peering state
|
|
// reconcilePeering enriches the peering with the following information:
|
|
// -- PeeringState.Active if the peering is active
|
|
// -- ImportedServicesCount and ExportedServicesCount
|
|
// NOTE: we return a new peering with this additional data
|
|
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
|
|
streamState, found := s.Tracker.StreamStatus(peering.ID)
|
|
if !found {
|
|
// TODO(peering): this may be noise on non-leaders
|
|
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
|
|
" exported services count or reconcile peering state", "peerID", peering.ID)
|
|
peering.StreamStatus = &pbpeering.StreamStatus{}
|
|
return peering
|
|
} else {
|
|
cp := copyPeering(peering)
|
|
|
|
// reconcile pbpeering.PeeringState_Active
|
|
if streamState.Connected {
|
|
cp.State = pbpeering.PeeringState_ACTIVE
|
|
} else if streamState.DisconnectErrorMessage != "" {
|
|
cp.State = pbpeering.PeeringState_FAILING
|
|
}
|
|
|
|
latest := func(tt ...time.Time) time.Time {
|
|
latest := time.Time{}
|
|
for _, t := range tt {
|
|
if t.After(latest) {
|
|
latest = t
|
|
}
|
|
}
|
|
return latest
|
|
}
|
|
|
|
lastRecv := latest(streamState.LastRecvHeartbeat, streamState.LastRecvError, streamState.LastRecvResourceSuccess)
|
|
lastSend := latest(streamState.LastSendError, streamState.LastSendSuccess)
|
|
|
|
cp.StreamStatus = &pbpeering.StreamStatus{
|
|
ImportedServices: streamState.ImportedServices,
|
|
ExportedServices: streamState.ExportedServices,
|
|
LastHeartbeat: structs.TimeToProto(streamState.LastRecvHeartbeat),
|
|
LastReceive: structs.TimeToProto(lastRecv),
|
|
LastSend: structs.TimeToProto(lastSend),
|
|
}
|
|
|
|
return cp
|
|
}
|
|
}
|
|
|
|
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
|
|
// Consider removing if we can find another way to populate state store in peering_endpoint_test.go
|
|
func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Peering.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringWriteResponse
|
|
handled, err := s.ForwardRPC(&writeRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringWrite(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "write"}, time.Now())
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := structs.DefaultEnterpriseMetaInPartition(req.Peering.Partition)
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().PeeringWriteAllowed(&authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if req.Peering == nil {
|
|
return nil, fmt.Errorf("missing required peering body")
|
|
}
|
|
|
|
var id string
|
|
peering, err := s.getExistingPeering(req.Peering.Name, entMeta.PartitionOrDefault())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if peering == nil {
|
|
id, err = lib.GenerateUUID(s.Backend.CheckPeeringUUID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
id = peering.ID
|
|
}
|
|
req.Peering.ID = id
|
|
|
|
err = s.Backend.PeeringWrite(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.PeeringWriteResponse{}, nil
|
|
}
|
|
|
|
func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var resp *pbpeering.PeeringDeleteResponse
|
|
handled, err := s.ForwardRPC(&writeRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).PeeringDelete(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "delete"}, time.Now())
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition)
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().PeeringWriteAllowed(&authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
q := state.Query{
|
|
Value: strings.ToLower(req.Name),
|
|
EnterpriseMeta: *entMeta,
|
|
}
|
|
_, existing, err := s.Backend.Store().PeeringRead(nil, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if existing == nil || existing.State == pbpeering.PeeringState_DELETING {
|
|
// Return early when the Peering doesn't exist or is already marked for deletion.
|
|
// We don't return nil because the pb will fail to marshal.
|
|
return &pbpeering.PeeringDeleteResponse{}, nil
|
|
}
|
|
|
|
// We are using a write request due to needing to perform a deferred deletion.
|
|
// The peering gets marked for deletion by setting the DeletedAt field,
|
|
// and a leader routine will handle deleting the peering.
|
|
writeReq := &pbpeering.PeeringWriteRequest{
|
|
Peering: &pbpeering.Peering{
|
|
// We only need to include the name and partition for the peering to be identified.
|
|
// All other data associated with the peering can be discarded because once marked
|
|
// for deletion the peering is effectively gone.
|
|
ID: existing.ID,
|
|
Name: req.Name,
|
|
State: pbpeering.PeeringState_DELETING,
|
|
PeerServerAddresses: existing.PeerServerAddresses,
|
|
DeletedAt: structs.TimeToProto(time.Now().UTC()),
|
|
|
|
// PartitionOrEmpty is used to avoid writing "default" in OSS.
|
|
Partition: entMeta.PartitionOrEmpty(),
|
|
},
|
|
}
|
|
err = s.Backend.PeeringWrite(writeReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.PeeringDeleteResponse{}, nil
|
|
}
|
|
|
|
// OPTIMIZE: Handle blocking queries
|
|
func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var resp *pbpeering.TrustBundleReadResponse
|
|
handled, err := s.ForwardRPC(&readRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).TrustBundleRead(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "trust_bundle_read"}, time.Now())
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := structs.DefaultEnterpriseMetaInPartition(req.Partition)
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().ServiceWriteAnyAllowed(&authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
idx, trustBundle, err := s.Backend.Store().PeeringTrustBundleRead(nil, state.Query{
|
|
Value: req.Name,
|
|
EnterpriseMeta: *entMeta,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read trust bundle for peer %s: %w", req.Name, err)
|
|
}
|
|
|
|
return &pbpeering.TrustBundleReadResponse{
|
|
Index: idx,
|
|
Bundle: trustBundle,
|
|
}, nil
|
|
}
|
|
|
|
// TODO(peering): rename rpc & request/response to drop the "service" part
|
|
// OPTIMIZE: Handle blocking queries
|
|
func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) {
|
|
if !s.Config.PeeringEnabled {
|
|
return nil, peeringNotEnabledErr
|
|
}
|
|
|
|
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
if err := s.Backend.EnterpriseCheckNamespaces(req.Namespace); err != nil {
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
if req.ServiceName == "" {
|
|
return nil, errors.New("missing service name")
|
|
}
|
|
|
|
var resp *pbpeering.TrustBundleListByServiceResponse
|
|
handled, err := s.ForwardRPC(&readRequest, func(conn *grpc.ClientConn) error {
|
|
ctx := external.ForwardMetadataContext(ctx)
|
|
var err error
|
|
resp, err = pbpeering.NewPeeringServiceClient(conn).TrustBundleListByService(ctx, req)
|
|
return err
|
|
})
|
|
if handled || err != nil {
|
|
return resp, err
|
|
}
|
|
|
|
defer metrics.MeasureSince([]string{"peering", "trust_bundle_list_by_service"}, time.Now())
|
|
|
|
var authzCtx acl.AuthorizerContext
|
|
entMeta := acl.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
|
|
options, err := external.QueryOptionsFromContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
authz, err := s.Backend.ResolveTokenAndDefaultMeta(options.Token, &entMeta, &authzCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(req.ServiceName, &authzCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var (
|
|
idx uint64
|
|
bundles []*pbpeering.PeeringTrustBundle
|
|
)
|
|
|
|
switch {
|
|
case req.Kind == string(structs.ServiceKindMeshGateway):
|
|
idx, bundles, err = s.Backend.Store().PeeringTrustBundleList(nil, entMeta)
|
|
case req.ServiceName != "":
|
|
idx, bundles, err = s.Backend.Store().TrustBundleListByService(nil, req.ServiceName, s.Datacenter, entMeta)
|
|
case req.Kind != "":
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, "kind must be mesh-gateway if set")
|
|
default:
|
|
return nil, grpcstatus.Error(codes.InvalidArgument, "one of service or kind is required")
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pbpeering.TrustBundleListByServiceResponse{Index: idx, Bundles: bundles}, nil
|
|
}
|
|
|
|
func (s *Server) getExistingPeering(peerName, partition string) (*pbpeering.Peering, error) {
|
|
q := state.Query{
|
|
Value: strings.ToLower(peerName),
|
|
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition),
|
|
}
|
|
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return peering, nil
|
|
}
|
|
|
|
func (s *Server) generateNewEstablishmentSecret() (string, error) {
|
|
id, err := lib.GenerateUUID(s.Backend.ValidateProposedPeeringSecret)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
// validatePeer enforces the following rule for an existing peering:
|
|
// - if a peering already exists, it can only be used as an acceptor or dialer
|
|
//
|
|
// We define a DIALER as a peering that has server addresses (or a peering that is created via the Establish endpoint)
|
|
// Conversely, we define an ACCEPTOR as a peering that is created via the GenerateToken endpoint
|
|
func validatePeer(peering *pbpeering.Peering, shouldDial bool) error {
|
|
if peering != nil && peering.ShouldDial() != shouldDial {
|
|
if shouldDial {
|
|
return fmt.Errorf("cannot create peering with name: %q; there is an existing peering expecting to be dialed", peering.Name)
|
|
} else {
|
|
return fmt.Errorf("cannot create peering with name: %q; there is already an established peering", peering.Name)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
|
|
var copyP pbpeering.Peering
|
|
proto.Merge(©P, p)
|
|
|
|
return ©P
|
|
}
|