376 lines
10 KiB
Go

package pbpeering
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/private/pbcommon"
)
// RequestDatacenter implements structs.RPCInfo
func (req *GenerateTokenRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (req *EstablishRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (req *PeeringReadRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (req *PeeringListRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (req *PeeringWriteRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (req *PeeringDeleteRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (req *TrustBundleReadRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (req *TrustBundleListByServiceRequest) RequestDatacenter() string {
// Cross-datacenter requests are not allowed for peering actions because
// they rely on WAN-federation.
return ""
}
// ShouldDial returns true when the peering was stored via the peering initiation endpoint,
// AND the peering is not marked as terminated by our peer.
// If we generated a token for this peer we did not store our server addresses under PeerServerAddresses or ManualServerAddresses.
// These server addresses are for dialing, and only the peer initiating the peering will do the dialing.
func (p *Peering) ShouldDial() bool {
return len(p.PeerServerAddresses) > 0 || len(p.ManualServerAddresses) > 0
}
// GetAddressesToDial returns the listing of addresses that should be dialed for the peering.
// It will ensure that manual addresses take precedence, if any are defined.
func (p *Peering) GetAddressesToDial() []string {
if len(p.ManualServerAddresses) > 0 {
return p.ManualServerAddresses
}
return p.PeerServerAddresses
}
func (x PeeringState) GoString() string {
return x.String()
}
// ConcatenatedRootPEMs concatenates and returns all PEM-encoded public certificates
// in a peer's trust bundle.
func (b *PeeringTrustBundle) ConcatenatedRootPEMs() string {
if b == nil {
return ""
}
var rootPEMs string
for _, pem := range b.RootPEMs {
rootPEMs += lib.EnsureTrailingNewline(pem)
}
return rootPEMs
}
// enumcover:PeeringState
func PeeringStateToAPI(s PeeringState) api.PeeringState {
switch s {
case PeeringState_PENDING:
return api.PeeringStatePending
case PeeringState_ESTABLISHING:
return api.PeeringStateEstablishing
case PeeringState_ACTIVE:
return api.PeeringStateActive
case PeeringState_FAILING:
return api.PeeringStateFailing
case PeeringState_DELETING:
return api.PeeringStateDeleting
case PeeringState_TERMINATED:
return api.PeeringStateTerminated
case PeeringState_UNDEFINED:
fallthrough
default:
return api.PeeringStateUndefined
}
}
// enumcover:api.PeeringState
func PeeringStateFromAPI(t api.PeeringState) PeeringState {
switch t {
case api.PeeringStatePending:
return PeeringState_PENDING
case api.PeeringStateEstablishing:
return PeeringState_ESTABLISHING
case api.PeeringStateActive:
return PeeringState_ACTIVE
case api.PeeringStateFailing:
return PeeringState_FAILING
case api.PeeringStateDeleting:
return PeeringState_DELETING
case api.PeeringStateTerminated:
return PeeringState_TERMINATED
case api.PeeringStateUndefined:
fallthrough
default:
return PeeringState_UNDEFINED
}
}
func StreamStatusToAPI(status *StreamStatus) api.PeeringStreamStatus {
return api.PeeringStreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: TimePtrFromProto(status.LastHeartbeat),
LastReceive: TimePtrFromProto(status.LastReceive),
LastSend: TimePtrFromProto(status.LastSend),
}
}
func StreamStatusFromAPI(status api.PeeringStreamStatus) *StreamStatus {
return &StreamStatus{
ImportedServices: status.ImportedServices,
ExportedServices: status.ExportedServices,
LastHeartbeat: TimePtrToProto(status.LastHeartbeat),
LastReceive: TimePtrToProto(status.LastReceive),
LastSend: TimePtrToProto(status.LastSend),
}
}
func (p *Peering) IsActive() bool {
if p == nil || p.State == PeeringState_TERMINATED {
return false
}
if p.DeletedAt == nil {
return true
}
// The minimum protobuf timestamp is the Unix epoch rather than go's zero.
return structs.IsZeroProtoTime(p.DeletedAt)
}
// Validate is a validation helper that checks whether a secret ID is embedded in the container type.
func (s *SecretsWriteRequest) Validate() error {
if s.PeerID == "" {
return errors.New("missing peer ID")
}
switch r := s.Request.(type) {
case *SecretsWriteRequest_GenerateToken:
if r != nil && r.GenerateToken.GetEstablishmentSecret() != "" {
return nil
}
case *SecretsWriteRequest_Establish:
if r != nil && r.Establish.GetActiveStreamSecret() != "" {
return nil
}
case *SecretsWriteRequest_ExchangeSecret:
if r != nil && r.ExchangeSecret.GetPendingStreamSecret() != "" {
return nil
}
case *SecretsWriteRequest_PromotePending:
if r != nil && r.PromotePending.GetActiveStreamSecret() != "" {
return nil
}
default:
return fmt.Errorf("unexpected request type %T", s.Request)
}
return errors.New("missing secret ID")
}
// TLSDialOption returns the gRPC DialOption to secure the transport if CAPems
// ara available. If no CAPems were provided in the peering token then the
// WithInsecure dial option is returned.
func (p *Peering) TLSDialOption() (grpc.DialOption, error) {
//nolint:staticcheck
tlsOption := grpc.WithInsecure()
if len(p.PeerCAPems) > 0 {
var haveCerts bool
pool := x509.NewCertPool()
for _, pem := range p.PeerCAPems {
if !pool.AppendCertsFromPEM([]byte(pem)) {
return nil, fmt.Errorf("failed to parse PEM %s", pem)
}
if len(pem) > 0 {
haveCerts = true
}
}
if !haveCerts {
return nil, fmt.Errorf("failed to build cert pool from peer CA pems")
}
cfg := tls.Config{
ServerName: p.PeerServerName,
RootCAs: pool,
}
tlsOption = grpc.WithTransportCredentials(credentials.NewTLS(&cfg))
}
return tlsOption, nil
}
func (p *Peering) ToAPI() *api.Peering {
var t api.Peering
PeeringToAPI(p, &t)
return &t
}
// TODO consider using mog for this
func (resp *PeeringListResponse) ToAPI() []*api.Peering {
list := make([]*api.Peering, len(resp.Peerings))
for i, p := range resp.Peerings {
list[i] = p.ToAPI()
}
return list
}
// TODO consider using mog for this
func (resp *GenerateTokenResponse) ToAPI() *api.PeeringGenerateTokenResponse {
var t api.PeeringGenerateTokenResponse
GenerateTokenResponseToAPI(resp, &t)
return &t
}
// TODO consider using mog for this
func (resp *EstablishResponse) ToAPI() *api.PeeringEstablishResponse {
var t api.PeeringEstablishResponse
EstablishResponseToAPI(resp, &t)
return &t
}
func (r *RemoteInfo) IsEmpty() bool {
if r == nil {
return true
}
return r.Partition == "" && r.Datacenter == "" && r.Locality.IsEmpty()
}
// convenience
func NewGenerateTokenRequestFromAPI(req *api.PeeringGenerateTokenRequest) *GenerateTokenRequest {
if req == nil {
return nil
}
t := &GenerateTokenRequest{}
GenerateTokenRequestFromAPI(req, t)
return t
}
// convenience
func NewEstablishRequestFromAPI(req *api.PeeringEstablishRequest) *EstablishRequest {
if req == nil {
return nil
}
t := &EstablishRequest{}
EstablishRequestFromAPI(req, t)
return t
}
func TimePtrFromProto(s *timestamppb.Timestamp) *time.Time {
if s == nil {
return nil
}
t := s.AsTime()
return &t
}
func TimePtrToProto(s *time.Time) *timestamppb.Timestamp {
if s == nil {
return nil
}
return timestamppb.New(*s)
}
// DeepCopy returns a copy of the PeeringTrustBundle that can be passed around
// without worrying about the receiver unsafely modifying it. It is used by the
// generated DeepCopy methods in proxycfg.
func (o *PeeringTrustBundle) DeepCopy() *PeeringTrustBundle {
cp, ok := proto.Clone(o).(*PeeringTrustBundle)
if !ok {
panic(fmt.Sprintf("failed to clone *PeeringTrustBundle, got: %T", cp))
}
return cp
}
// TODO: handle this with mog
// LocalityToStructs converts a protobuf Locality to a struct Locality.
func LocalityToStructs(l *pbcommon.Locality) *structs.Locality {
if l == nil {
return nil
}
return &structs.Locality{
Region: l.Region,
Zone: l.Zone,
}
}
// TODO: handle this with mog
// LocalityFromStructs converts a struct Locality to a protobuf Locality.
func LocalityFromStructs(l *structs.Locality) *pbcommon.Locality {
if l == nil {
return nil
}
return &pbcommon.Locality{
Region: l.Region,
Zone: l.Zone,
}
}
// TODO: handle this with mog
// LocalityToAPI converts a protobuf Locality to an API Locality.
func LocalityToAPI(l *pbcommon.Locality) *api.Locality {
if l == nil {
return nil
}
return &api.Locality{
Region: l.Region,
Zone: l.Zone,
}
}
// TODO: handle this with mog
// LocalityFromProto converts an API Locality to a protobuf Locality.
func LocalityFromAPI(l *api.Locality) *pbcommon.Locality {
if l == nil {
return nil
}
return &pbcommon.Locality{
Region: l.Region,
Zone: l.Zone,
}
}