mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
Return mesh gateway addrs if peering through mgw
This commit is contained in:
parent
de04344e47
commit
4ff9d475b0
@ -9,10 +9,12 @@ import (
|
|||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/acl/resolver"
|
"github.com/hashicorp/consul/acl/resolver"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
|
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
|
||||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/ipaddr"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -57,9 +59,38 @@ func (b *PeeringBackend) GetAgentCACertificates() ([]string, error) {
|
|||||||
return b.srv.tlsConfigurator.GRPCManualCAPems(), nil
|
return b.srv.tlsConfigurator.GRPCManualCAPems(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetServerAddresses looks up server node addresses from the state store.
|
// GetServerAddresses looks up server or mesh gateway addresses from the state store.
|
||||||
func (b *PeeringBackend) GetServerAddresses() ([]string, error) {
|
func (b *PeeringBackend) GetServerAddresses() ([]string, error) {
|
||||||
state := b.srv.fsm.State()
|
_, rawEntry, err := b.srv.fsm.State().ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, acl.DefaultEnterpriseMeta())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read mesh config entry: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
meshConfig, ok := rawEntry.(*structs.MeshConfigEntry)
|
||||||
|
if ok && meshConfig.Peering != nil && meshConfig.Peering.PeerThroughMeshGateways {
|
||||||
|
return meshGatewayAdresses(b.srv.fsm.State())
|
||||||
|
}
|
||||||
|
return serverAddresses(b.srv.fsm.State())
|
||||||
|
}
|
||||||
|
|
||||||
|
func meshGatewayAdresses(state *state.Store) ([]string, error) {
|
||||||
|
_, nodes, err := state.ServiceDump(nil, structs.ServiceKindMeshGateway, true, acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to dump gateway addresses: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var addrs []string
|
||||||
|
for _, node := range nodes {
|
||||||
|
_, addr, port := node.BestAddress(true)
|
||||||
|
addrs = append(addrs, ipaddr.FormatAddressPort(addr, port))
|
||||||
|
}
|
||||||
|
if len(addrs) == 0 {
|
||||||
|
return nil, fmt.Errorf("servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
|
||||||
|
}
|
||||||
|
return addrs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serverAddresses(state *state.Store) ([]string, error) {
|
||||||
_, nodes, err := state.ServiceNodes(nil, "consul", structs.DefaultEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword)
|
_, nodes, err := state.ServiceNodes(nil, "consul", structs.DefaultEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2,10 +2,13 @@ package consul
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
gogrpc "google.golang.org/grpc"
|
gogrpc "google.golang.org/grpc"
|
||||||
|
|
||||||
@ -17,7 +20,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestPeeringBackend_ForwardToLeader(t *testing.T) {
|
func TestPeeringBackend_ForwardToLeader(t *testing.T) {
|
||||||
t.Parallel()
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
_, conf1 := testServerConfig(t)
|
_, conf1 := testServerConfig(t)
|
||||||
server1, err := newServer(t, conf1)
|
server1, err := newServer(t, conf1)
|
||||||
@ -60,6 +65,83 @@ func TestPeeringBackend_ForwardToLeader(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeeringBackend_GetServerAddresses(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, cfg := testServerConfig(t)
|
||||||
|
cfg.GRPCTLSPort = 8505
|
||||||
|
|
||||||
|
srv, err := newServer(t, cfg)
|
||||||
|
require.NoError(t, err)
|
||||||
|
testrpc.WaitForLeader(t, srv.RPC, "dc1")
|
||||||
|
|
||||||
|
backend := NewPeeringBackend(srv)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "peer to servers", func(t *testing.T) {
|
||||||
|
addrs, err := backend.GetServerAddresses()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
|
||||||
|
require.Equal(t, []string{expect}, addrs)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "existence of mesh config entry is not enough to peer through gateways", func(t *testing.T) {
|
||||||
|
mesh := structs.MeshConfigEntry{
|
||||||
|
// Enable unrelated config.
|
||||||
|
TransparentProxy: structs.TransparentProxyMeshConfig{
|
||||||
|
MeshDestinationsOnly: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
|
||||||
|
addrs, err := backend.GetServerAddresses()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Still expect server address because PeerThroughMeshGateways was not enabled.
|
||||||
|
expect := fmt.Sprintf("127.0.0.1:%d", srv.config.GRPCTLSPort)
|
||||||
|
require.Equal(t, []string{expect}, addrs)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "cannot peer through gateways without registered gateways", func(t *testing.T) {
|
||||||
|
mesh := structs.MeshConfigEntry{
|
||||||
|
Peering: &structs.PeeringMeshConfig{PeerThroughMeshGateways: true},
|
||||||
|
}
|
||||||
|
require.NoError(t, srv.fsm.State().EnsureConfigEntry(1, &mesh))
|
||||||
|
|
||||||
|
addrs, err := backend.GetServerAddresses()
|
||||||
|
require.Nil(t, addrs)
|
||||||
|
testutil.RequireErrorContains(t, err,
|
||||||
|
"servers are configured to PeerThroughMeshGateways, but no mesh gateway instances are registered")
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "peer through mesh gateways", func(t *testing.T) {
|
||||||
|
reg := structs.RegisterRequest{
|
||||||
|
ID: types.NodeID("b5489ca9-f5e9-4dba-a779-61fec4e8e364"),
|
||||||
|
Node: "gw-node",
|
||||||
|
Address: "1.2.3.4",
|
||||||
|
TaggedAddresses: map[string]string{
|
||||||
|
structs.TaggedAddressWAN: "172.217.22.14",
|
||||||
|
},
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "mesh-gateway",
|
||||||
|
Service: "mesh-gateway",
|
||||||
|
Kind: structs.ServiceKindMeshGateway,
|
||||||
|
Port: 443,
|
||||||
|
TaggedAddresses: map[string]structs.ServiceAddress{
|
||||||
|
structs.TaggedAddressWAN: {Address: "154.238.12.252", Port: 8443},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
require.NoError(t, srv.fsm.State().EnsureRegistration(2, ®))
|
||||||
|
|
||||||
|
addrs, err := backend.GetServerAddresses()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, []string{"154.238.12.252:8443"}, addrs)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) {
|
func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn, error) {
|
||||||
return func(ctx context.Context, addr string) (net.Conn, error) {
|
return func(ctx context.Context, addr string) (net.Conn, error) {
|
||||||
d := net.Dialer{}
|
d := net.Dialer{}
|
||||||
@ -79,7 +161,9 @@ func newServerDialer(serverAddr string) func(context.Context, string) (net.Conn,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPeerStreamService_ForwardToLeader(t *testing.T) {
|
func TestPeerStreamService_ForwardToLeader(t *testing.T) {
|
||||||
t.Parallel()
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
_, conf1 := testServerConfig(t)
|
_, conf1 := testServerConfig(t)
|
||||||
server1, err := newServer(t, conf1)
|
server1, err := newServer(t, conf1)
|
||||||
|
@ -117,7 +117,8 @@ type Backend interface {
|
|||||||
// GetAgentCACertificates returns the CA certificate to be returned in the peering token data
|
// GetAgentCACertificates returns the CA certificate to be returned in the peering token data
|
||||||
GetAgentCACertificates() ([]string, error)
|
GetAgentCACertificates() ([]string, error)
|
||||||
|
|
||||||
// GetServerAddresses returns the addresses used for establishing a peering connection
|
// GetServerAddresses returns the addresses used for establishing a peering connection.
|
||||||
|
// These may be server addresses or mesh gateway addresses if peering through mesh gateways.
|
||||||
GetServerAddresses() ([]string, error)
|
GetServerAddresses() ([]string, error)
|
||||||
|
|
||||||
// GetServerName returns the SNI to be returned in the peering token data which
|
// GetServerName returns the SNI to be returned in the peering token data which
|
||||||
|
Loading…
x
Reference in New Issue
Block a user