From 0ceec9017b07bd271bcdfd1b0ec59d52be32bd3d Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Thu, 25 Aug 2022 11:44:58 -0500 Subject: [PATCH] Expose `grpc_tls` via serf for cluster peering. --- agent/agent.go | 7 ++- agent/consul/config.go | 3 + agent/consul/leader.go | 4 ++ agent/consul/leader_test.go | 16 ++++- agent/consul/peering_backend.go | 16 +++-- agent/consul/server_serf.go | 3 + agent/metadata/server.go | 103 +++++++++++++++++--------------- agent/metadata/server_test.go | 83 +++++++++++++++++-------- 8 files changed, 156 insertions(+), 79 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 3f9f58989a..a92c23e7dd 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -807,7 +807,7 @@ func (a *Agent) listenAndServeGRPC() error { // Spawn listeners and register xds servers. var listeners []net.Listener - start := func(proto string, addrs []net.Addr, srv *grpc.Server) error { + start := func(port_name string, addrs []net.Addr, srv *grpc.Server) error { if len(addrs) < 1 || srv == nil { return nil } @@ -822,13 +822,13 @@ func (a *Agent) listenAndServeGRPC() error { for _, l := range ln { go func(innerL net.Listener) { a.logger.Info("Started gRPC server", - "protocol", proto, + "port_name", port_name, "address", innerL.Addr().String(), "network", innerL.Addr().Network(), ) err := srv.Serve(innerL) if err != nil { - a.logger.Error("gRPC server failed", "protocol", proto, "error", err) + a.logger.Error("gRPC server failed", "port_name", port_name, "error", err) } }(l) } @@ -1228,6 +1228,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co cfg.RPCAdvertise = runtimeCfg.RPCAdvertiseAddr cfg.GRPCPort = runtimeCfg.GRPCPort + cfg.GRPCTLSPort = runtimeCfg.GRPCTLSPort cfg.Segment = runtimeCfg.SegmentName if len(runtimeCfg.Segments) > 0 { diff --git a/agent/consul/config.go b/agent/consul/config.go index 38063f808a..69d4fddee5 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -133,6 +133,9 @@ type Config struct { // GRPCPort is the port the public gRPC server listens on. GRPCPort int + // GRPCTLSPort is the port the public gRPC TLS server listens on. + GRPCTLSPort int + // (Enterprise-only) The network segment this agent is part of. Segment string diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 389b790569..0cc4260df0 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -1077,6 +1077,10 @@ func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *acl.Enterpri if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 { service.Meta["grpc_port"] = grpcPortStr } + grpcTLSPortStr := member.Tags["grpc_tls_port"] + if v, err := strconv.Atoi(grpcTLSPortStr); err == nil && v > 0 { + service.Meta["grpc_tls_port"] = grpcTLSPortStr + } // Attempt to join the consul server if err := s.joinConsulServer(member, parts); err != nil { diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 71bdd0017b..f618117b9e 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "testing" "time" @@ -19,6 +20,7 @@ import ( "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -355,8 +357,10 @@ func TestLeader_CheckServersMeta(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } - t.Parallel() + + ports := freeport.GetN(t, 2) // s3 grpc, s3 grpc_tls + dir1, s1 := testServerWithConfig(t, func(c *Config) { c.PrimaryDatacenter = "dc1" c.ACLsEnabled = true @@ -383,6 +387,8 @@ func TestLeader_CheckServersMeta(t *testing.T) { c.ACLInitialManagementToken = "root" c.ACLResolverSettings.ACLDefaultPolicy = "allow" c.Bootstrap = false + c.GRPCPort = ports[0] + c.GRPCTLSPort = ports[1] }) defer os.RemoveAll(dir3) defer s3.Shutdown() @@ -456,6 +462,14 @@ func TestLeader_CheckServersMeta(t *testing.T) { if newVersion != versionToExpect { r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion) } + grpcPort := service.Meta["grpc_port"] + if grpcPort != strconv.Itoa(ports[0]) { + r.Fatalf("Expected grpc port to be %d, was %s", ports[0], grpcPort) + } + grpcTLSPort := service.Meta["grpc_tls_port"] + if grpcTLSPort != strconv.Itoa(ports[1]) { + r.Fatalf("Expected grpc tls port to be %d, was %s", ports[1], grpcTLSPort) + } }) } diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 0f8b009e9c..5b01b9d040 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -66,11 +66,19 @@ func (b *PeeringBackend) GetServerAddresses() ([]string, error) { } var addrs []string for _, node := range nodes { - grpcPortStr := node.ServiceMeta["grpc_port"] - if v, err := strconv.Atoi(grpcPortStr); err != nil || v < 1 { - continue // skip server that isn't exporting public gRPC properly + // Prefer the TLS port if it is defined. + grpcPortStr := node.ServiceMeta["grpc_tls_port"] + if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 { + addrs = append(addrs, node.Address+":"+grpcPortStr) + continue } - addrs = append(addrs, node.Address+":"+grpcPortStr) + // Fallback to the standard port if TLS is not defined. + grpcPortStr = node.ServiceMeta["grpc_port"] + if v, err := strconv.Atoi(grpcPortStr); err == nil && v > 0 { + addrs = append(addrs, node.Address+":"+grpcPortStr) + continue + } + // Skip node if neither defined. } if len(addrs) == 0 { return nil, fmt.Errorf("a grpc bind port must be specified in the configuration for all servers") diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 569bc977ff..80f44aedc2 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -107,6 +107,9 @@ func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) { if s.config.GRPCPort > 0 { conf.Tags["grpc_port"] = fmt.Sprintf("%d", s.config.GRPCPort) } + if s.config.GRPCTLSPort > 0 { + conf.Tags["grpc_tls_port"] = fmt.Sprintf("%d", s.config.GRPCTLSPort) + } if s.config.Bootstrap { conf.Tags["bootstrap"] = "1" } diff --git a/agent/metadata/server.go b/agent/metadata/server.go index 83997f7cd1..86f48749a1 100644 --- a/agent/metadata/server.go +++ b/agent/metadata/server.go @@ -23,26 +23,30 @@ func (k *Key) Equal(x *Key) bool { // Server is used to return details of a consul server type Server struct { - Name string // . - ShortName string // - ID string - Datacenter string - Segment string - Port int - SegmentAddrs map[string]string - SegmentPorts map[string]int - WanJoinPort int - LanJoinPort int - ExternalGRPCPort int - Bootstrap bool - Expect int - Build version.Version - Version int - RaftVersion int - Addr net.Addr - Status serf.MemberStatus - ReadReplica bool - FeatureFlags map[string]int + Name string // . + ShortName string // + ID string + Datacenter string + Segment string + Port int + SegmentAddrs map[string]string + SegmentPorts map[string]int + WanJoinPort int + LanJoinPort int + + // TODO why are these ports needed? It looks like nothing is referencing them. + ExternalGRPCPort int + ExternalGRPCTLSPort int + + Bootstrap bool + Expect int + Build version.Version + Version int + RaftVersion int + Addr net.Addr + Status serf.MemberStatus + ReadReplica bool + FeatureFlags map[string]int // If true, use TLS when connecting to this server UseTLS bool @@ -137,14 +141,18 @@ func IsConsulServer(m serf.Member) (bool, *Server) { } } - externalGRPCPort := 0 - externalGRPCPortStr, ok := m.Tags["grpc_port"] - if ok { - externalGRPCPort, err = strconv.Atoi(externalGRPCPortStr) - if err != nil { - return false, nil - } - if externalGRPCPort < 1 { + var externalGRPCPort, externalGRPCTLSPort int + externalGRPCPortStr, foundGRPC := m.Tags["grpc_port"] + externalGRPCTLSPortStr, foundGRPCTLS := m.Tags["grpc_tls_port"] + if foundGRPC { + externalGRPCPort, _ = strconv.Atoi(externalGRPCPortStr) + } + if foundGRPCTLS { + externalGRPCTLSPort, _ = strconv.Atoi(externalGRPCTLSPortStr) + } + // If either port tag was found, check to ensure that at least one port was valid. + if foundGRPC || foundGRPCTLS { + if externalGRPCPort < 1 && externalGRPCTLSPort < 1 { return false, nil } } @@ -173,25 +181,26 @@ func IsConsulServer(m serf.Member) (bool, *Server) { addr := &net.TCPAddr{IP: m.Addr, Port: port} parts := &Server{ - Name: m.Name, - ShortName: strings.TrimSuffix(m.Name, "."+datacenter), - ID: m.Tags["id"], - Datacenter: datacenter, - Segment: segment, - Port: port, - SegmentAddrs: segmentAddrs, - SegmentPorts: segmentPorts, - WanJoinPort: wanJoinPort, - LanJoinPort: int(m.Port), - ExternalGRPCPort: externalGRPCPort, - Bootstrap: bootstrap, - Expect: expect, - Addr: addr, - Build: *buildVersion, - Version: vsn, - RaftVersion: raftVsn, - Status: m.Status, - UseTLS: useTLS, + Name: m.Name, + ShortName: strings.TrimSuffix(m.Name, "."+datacenter), + ID: m.Tags["id"], + Datacenter: datacenter, + Segment: segment, + Port: port, + SegmentAddrs: segmentAddrs, + SegmentPorts: segmentPorts, + WanJoinPort: wanJoinPort, + LanJoinPort: int(m.Port), + ExternalGRPCPort: externalGRPCPort, + ExternalGRPCTLSPort: externalGRPCTLSPort, + Bootstrap: bootstrap, + Expect: expect, + Addr: addr, + Build: *buildVersion, + Version: vsn, + RaftVersion: raftVsn, + Status: m.Status, + UseTLS: useTLS, // DEPRECATED - remove nonVoter check once support for that tag is removed ReadReplica: nonVoter || readReplica, FeatureFlags: featureFlags, diff --git a/agent/metadata/server_test.go b/agent/metadata/server_test.go index 2f56bd7fd4..6913021224 100644 --- a/agent/metadata/server_test.go +++ b/agent/metadata/server_test.go @@ -73,6 +73,7 @@ func TestIsConsulServer(t *testing.T) { "build": "0.8.0", "wan_join_port": "1234", "grpc_port": "9876", + "grpc_tls_port": "9877", "vsn": "1", "expect": "3", "raft_vsn": "3", @@ -82,19 +83,20 @@ func TestIsConsulServer(t *testing.T) { } expected := &metadata.Server{ - Name: "foo", - ShortName: "foo", - ID: "asdf", - Datacenter: "east-aws", - Segment: "", - Port: 10000, - SegmentAddrs: map[string]string{}, - SegmentPorts: map[string]int{}, - WanJoinPort: 1234, - LanJoinPort: 5454, - ExternalGRPCPort: 9876, - Bootstrap: false, - Expect: 3, + Name: "foo", + ShortName: "foo", + ID: "asdf", + Datacenter: "east-aws", + Segment: "", + Port: 10000, + SegmentAddrs: map[string]string{}, + SegmentPorts: map[string]int{}, + WanJoinPort: 1234, + LanJoinPort: 5454, + ExternalGRPCPort: 9876, + ExternalGRPCTLSPort: 9877, + Bootstrap: false, + Expect: 3, Addr: &net.TCPAddr{ IP: net.IP([]byte{127, 0, 0, 1}), Port: 10000, @@ -137,15 +139,41 @@ func TestIsConsulServer(t *testing.T) { case "feature-namespaces": m.Tags["ft_ns"] = "1" expected.FeatureFlags = map[string]int{"ns": 1} - // - case "bad-grpc-port": - m.Tags["grpc_port"] = "three" - case "negative-grpc-port": - m.Tags["grpc_port"] = "-1" - case "zero-grpc-port": - m.Tags["grpc_port"] = "0" case "no-role": delete(m.Tags, "role") + // + case "missing-grpc-port": + delete(m.Tags, "grpc_port") + expected.ExternalGRPCPort = 0 + case "missing-grpc-tls-port": + delete(m.Tags, "grpc_tls_port") + expected.ExternalGRPCTLSPort = 0 + case "missing-both-grpc-ports": + delete(m.Tags, "grpc_port") + delete(m.Tags, "grpc_tls_port") + expected.ExternalGRPCPort = 0 + expected.ExternalGRPCTLSPort = 0 + case "bad-both-grpc-ports": + m.Tags["grpc_port"] = "" + m.Tags["grpc_tls_port"] = "" + case "bad-grpc-port": + m.Tags["grpc_port"] = "three" + m.Tags["grpc_tls_port"] = "" + case "bad-grpc-tls-port": + m.Tags["grpc_port"] = "" + m.Tags["grpc_tls_port"] = "three" + case "negative-grpc-port": + m.Tags["grpc_port"] = "-1" + m.Tags["grpc_tls_port"] = "" + case "negative-grpc-tls-port": + m.Tags["grpc_port"] = "" + m.Tags["grpc_tls_port"] = "-1" + case "zero-grpc-port": + m.Tags["grpc_port"] = "0" + m.Tags["grpc_tls_port"] = "" + case "zero-grpc-tls-port": + m.Tags["grpc_port"] = "" + m.Tags["grpc_tls_port"] = "0" default: t.Fatalf("unhandled variant: %s", variant) } @@ -174,11 +202,18 @@ func TestIsConsulServer(t *testing.T) { "bootstrapped": true, "optionals": true, "feature-namespaces": true, - // "no-role": false, - "bad-grpc-port": false, - "negative-grpc-port": false, - "zero-grpc-port": false, + // + "missing-grpc-port": true, + "missing-grpc-tls-port": true, + "missing-both-grpc-ports": true, + "bad-both-grpc-ports": false, + "bad-grpc-port": false, + "negative-grpc-port": false, + "zero-grpc-port": false, + "bad-grpc-tls-port": false, + "negative-grpc-tls-port": false, + "zero-grpc-tls-port": false, } for variant, expectOK := range cases {