Expose grpc_tls via serf for cluster peering.

This commit is contained in:
Derek Menteer 2022-08-25 11:44:58 -05:00
parent 1255a8a20d
commit 0ceec9017b
8 changed files with 156 additions and 79 deletions

View File

@ -807,7 +807,7 @@ func (a *Agent) listenAndServeGRPC() error {
// Spawn listeners and register xds servers.
var listeners []net.Listener
start := func(proto string, addrs []net.Addr, srv *grpc.Server) error {
start := func(port_name string, addrs []net.Addr, srv *grpc.Server) error {
if len(addrs) < 1 || srv == nil {
return nil
}
@ -822,13 +822,13 @@ func (a *Agent) listenAndServeGRPC() error {
for _, l := range ln {
go func(innerL net.Listener) {
a.logger.Info("Started gRPC server",
"protocol", proto,
"port_name", port_name,
"address", innerL.Addr().String(),
"network", innerL.Addr().Network(),
)
err := srv.Serve(innerL)
if err != nil {
a.logger.Error("gRPC server failed", "protocol", proto, "error", err)
a.logger.Error("gRPC server failed", "port_name", port_name, "error", err)
}
}(l)
}
@ -1228,6 +1228,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
cfg.RPCAdvertise = runtimeCfg.RPCAdvertiseAddr
cfg.GRPCPort = runtimeCfg.GRPCPort
cfg.GRPCTLSPort = runtimeCfg.GRPCTLSPort
cfg.Segment = runtimeCfg.SegmentName
if len(runtimeCfg.Segments) > 0 {

View File

@ -133,6 +133,9 @@ type Config struct {
// GRPCPort is the port the public gRPC server listens on.
GRPCPort int
// GRPCTLSPort is the port the public gRPC TLS server listens on.
GRPCTLSPort int
// (Enterprise-only) The network segment this agent is part of.
Segment string

View File

@ -1077,6 +1077,10 @@ func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *acl.Enterpri
if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 {
service.Meta["grpc_port"] = grpcPortStr
}
grpcTLSPortStr := member.Tags["grpc_tls_port"]
if v, err := strconv.Atoi(grpcTLSPortStr); err == nil && v > 0 {
service.Meta["grpc_tls_port"] = grpcTLSPortStr
}
// Attempt to join the consul server
if err := s.joinConsulServer(member, parts); err != nil {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"testing"
"time"
@ -19,6 +20,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
@ -355,8 +357,10 @@ func TestLeader_CheckServersMeta(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
ports := freeport.GetN(t, 2) // s3 grpc, s3 grpc_tls
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
@ -383,6 +387,8 @@ func TestLeader_CheckServersMeta(t *testing.T) {
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
c.GRPCPort = ports[0]
c.GRPCTLSPort = ports[1]
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
@ -456,6 +462,14 @@ func TestLeader_CheckServersMeta(t *testing.T) {
if newVersion != versionToExpect {
r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion)
}
grpcPort := service.Meta["grpc_port"]
if grpcPort != strconv.Itoa(ports[0]) {
r.Fatalf("Expected grpc port to be %d, was %s", ports[0], grpcPort)
}
grpcTLSPort := service.Meta["grpc_tls_port"]
if grpcTLSPort != strconv.Itoa(ports[1]) {
r.Fatalf("Expected grpc tls port to be %d, was %s", ports[1], grpcTLSPort)
}
})
}

View File

@ -66,11 +66,19 @@ func (b *PeeringBackend) GetServerAddresses() ([]string, error) {
}
var addrs []string
for _, node := range nodes {
grpcPortStr := node.ServiceMeta["grpc_port"]
if v, err := strconv.Atoi(grpcPortStr); err != nil || v < 1 {
continue // skip server that isn't exporting public gRPC properly
// Prefer the TLS port if it is defined.
grpcPortStr := node.ServiceMeta["grpc_tls_port"]
if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 {
addrs = append(addrs, node.Address+":"+grpcPortStr)
continue
}
addrs = append(addrs, node.Address+":"+grpcPortStr)
// Fallback to the standard port if TLS is not defined.
grpcPortStr = node.ServiceMeta["grpc_port"]
if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 {
addrs = append(addrs, node.Address+":"+grpcPortStr)
continue
}
// Skip node if neither defined.
}
if len(addrs) == 0 {
return nil, fmt.Errorf("a grpc bind port must be specified in the configuration for all servers")

View File

@ -107,6 +107,9 @@ func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) {
if s.config.GRPCPort > 0 {
conf.Tags["grpc_port"] = fmt.Sprintf("%d", s.config.GRPCPort)
}
if s.config.GRPCTLSPort > 0 {
conf.Tags["grpc_tls_port"] = fmt.Sprintf("%d", s.config.GRPCTLSPort)
}
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}

View File

@ -23,26 +23,30 @@ func (k *Key) Equal(x *Key) bool {
// Server is used to return details of a consul server
type Server struct {
Name string // <node>.<dc>
ShortName string // <node>
ID string
Datacenter string
Segment string
Port int
SegmentAddrs map[string]string
SegmentPorts map[string]int
WanJoinPort int
LanJoinPort int
ExternalGRPCPort int
Bootstrap bool
Expect int
Build version.Version
Version int
RaftVersion int
Addr net.Addr
Status serf.MemberStatus
ReadReplica bool
FeatureFlags map[string]int
Name string // <node>.<dc>
ShortName string // <node>
ID string
Datacenter string
Segment string
Port int
SegmentAddrs map[string]string
SegmentPorts map[string]int
WanJoinPort int
LanJoinPort int
// TODO why are these ports needed? It looks like nothing is referencing them.
ExternalGRPCPort int
ExternalGRPCTLSPort int
Bootstrap bool
Expect int
Build version.Version
Version int
RaftVersion int
Addr net.Addr
Status serf.MemberStatus
ReadReplica bool
FeatureFlags map[string]int
// If true, use TLS when connecting to this server
UseTLS bool
@ -137,14 +141,18 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
}
}
externalGRPCPort := 0
externalGRPCPortStr, ok := m.Tags["grpc_port"]
if ok {
externalGRPCPort, err = strconv.Atoi(externalGRPCPortStr)
if err != nil {
return false, nil
}
if externalGRPCPort < 1 {
var externalGRPCPort, externalGRPCTLSPort int
externalGRPCPortStr, foundGRPC := m.Tags["grpc_port"]
externalGRPCTLSPortStr, foundGRPCTLS := m.Tags["grpc_tls_port"]
if foundGRPC {
externalGRPCPort, _ = strconv.Atoi(externalGRPCPortStr)
}
if foundGRPCTLS {
externalGRPCTLSPort, _ = strconv.Atoi(externalGRPCTLSPortStr)
}
// If either port tag was found, check to ensure that at least one port was valid.
if foundGRPC || foundGRPCTLS {
if externalGRPCPort < 1 && externalGRPCTLSPort < 1 {
return false, nil
}
}
@ -173,25 +181,26 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &Server{
Name: m.Name,
ShortName: strings.TrimSuffix(m.Name, "."+datacenter),
ID: m.Tags["id"],
Datacenter: datacenter,
Segment: segment,
Port: port,
SegmentAddrs: segmentAddrs,
SegmentPorts: segmentPorts,
WanJoinPort: wanJoinPort,
LanJoinPort: int(m.Port),
ExternalGRPCPort: externalGRPCPort,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Build: *buildVersion,
Version: vsn,
RaftVersion: raftVsn,
Status: m.Status,
UseTLS: useTLS,
Name: m.Name,
ShortName: strings.TrimSuffix(m.Name, "."+datacenter),
ID: m.Tags["id"],
Datacenter: datacenter,
Segment: segment,
Port: port,
SegmentAddrs: segmentAddrs,
SegmentPorts: segmentPorts,
WanJoinPort: wanJoinPort,
LanJoinPort: int(m.Port),
ExternalGRPCPort: externalGRPCPort,
ExternalGRPCTLSPort: externalGRPCTLSPort,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Build: *buildVersion,
Version: vsn,
RaftVersion: raftVsn,
Status: m.Status,
UseTLS: useTLS,
// DEPRECATED - remove nonVoter check once support for that tag is removed
ReadReplica: nonVoter || readReplica,
FeatureFlags: featureFlags,

View File

@ -73,6 +73,7 @@ func TestIsConsulServer(t *testing.T) {
"build": "0.8.0",
"wan_join_port": "1234",
"grpc_port": "9876",
"grpc_tls_port": "9877",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
@ -82,19 +83,20 @@ func TestIsConsulServer(t *testing.T) {
}
expected := &metadata.Server{
Name: "foo",
ShortName: "foo",
ID: "asdf",
Datacenter: "east-aws",
Segment: "",
Port: 10000,
SegmentAddrs: map[string]string{},
SegmentPorts: map[string]int{},
WanJoinPort: 1234,
LanJoinPort: 5454,
ExternalGRPCPort: 9876,
Bootstrap: false,
Expect: 3,
Name: "foo",
ShortName: "foo",
ID: "asdf",
Datacenter: "east-aws",
Segment: "",
Port: 10000,
SegmentAddrs: map[string]string{},
SegmentPorts: map[string]int{},
WanJoinPort: 1234,
LanJoinPort: 5454,
ExternalGRPCPort: 9876,
ExternalGRPCTLSPort: 9877,
Bootstrap: false,
Expect: 3,
Addr: &net.TCPAddr{
IP: net.IP([]byte{127, 0, 0, 1}),
Port: 10000,
@ -137,15 +139,41 @@ func TestIsConsulServer(t *testing.T) {
case "feature-namespaces":
m.Tags["ft_ns"] = "1"
expected.FeatureFlags = map[string]int{"ns": 1}
//
case "bad-grpc-port":
m.Tags["grpc_port"] = "three"
case "negative-grpc-port":
m.Tags["grpc_port"] = "-1"
case "zero-grpc-port":
m.Tags["grpc_port"] = "0"
case "no-role":
delete(m.Tags, "role")
//
case "missing-grpc-port":
delete(m.Tags, "grpc_port")
expected.ExternalGRPCPort = 0
case "missing-grpc-tls-port":
delete(m.Tags, "grpc_tls_port")
expected.ExternalGRPCTLSPort = 0
case "missing-both-grpc-ports":
delete(m.Tags, "grpc_port")
delete(m.Tags, "grpc_tls_port")
expected.ExternalGRPCPort = 0
expected.ExternalGRPCTLSPort = 0
case "bad-both-grpc-ports":
m.Tags["grpc_port"] = ""
m.Tags["grpc_tls_port"] = ""
case "bad-grpc-port":
m.Tags["grpc_port"] = "three"
m.Tags["grpc_tls_port"] = ""
case "bad-grpc-tls-port":
m.Tags["grpc_port"] = ""
m.Tags["grpc_tls_port"] = "three"
case "negative-grpc-port":
m.Tags["grpc_port"] = "-1"
m.Tags["grpc_tls_port"] = ""
case "negative-grpc-tls-port":
m.Tags["grpc_port"] = ""
m.Tags["grpc_tls_port"] = "-1"
case "zero-grpc-port":
m.Tags["grpc_port"] = "0"
m.Tags["grpc_tls_port"] = ""
case "zero-grpc-tls-port":
m.Tags["grpc_port"] = ""
m.Tags["grpc_tls_port"] = "0"
default:
t.Fatalf("unhandled variant: %s", variant)
}
@ -174,11 +202,18 @@ func TestIsConsulServer(t *testing.T) {
"bootstrapped": true,
"optionals": true,
"feature-namespaces": true,
//
"no-role": false,
"bad-grpc-port": false,
"negative-grpc-port": false,
"zero-grpc-port": false,
//
"missing-grpc-port": true,
"missing-grpc-tls-port": true,
"missing-both-grpc-ports": true,
"bad-both-grpc-ports": false,
"bad-grpc-port": false,
"negative-grpc-port": false,
"zero-grpc-port": false,
"bad-grpc-tls-port": false,
"negative-grpc-tls-port": false,
"zero-grpc-tls-port": false,
}
for variant, expectOK := range cases {