mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 21:35:52 +00:00
d8299670cc
So that if two datacenters end up with overlapping serverIDs we don't send requests to the wrong server
159 lines
4.3 KiB
Go
159 lines
4.3 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
|
|
"github.com/hashicorp/consul/agent/grpc/resolver"
|
|
"github.com/hashicorp/consul/agent/metadata"
|
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
|
)
|
|
|
|
func TestNewDialer_WithTLSWrapper(t *testing.T) {
|
|
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
t.Cleanup(logError(t, lis.Close))
|
|
|
|
builder := resolver.NewServerResolverBuilder(resolver.Config{})
|
|
builder.AddServer(&metadata.Server{
|
|
Name: "server-1",
|
|
ID: "ID1",
|
|
Datacenter: "dc1",
|
|
Addr: lis.Addr(),
|
|
UseTLS: true,
|
|
})
|
|
|
|
var called bool
|
|
wrapper := func(_ string, conn net.Conn) (net.Conn, error) {
|
|
called = true
|
|
return conn, nil
|
|
}
|
|
dial := newDialer(builder, wrapper)
|
|
ctx := context.Background()
|
|
conn, err := dial(ctx, lis.Addr().String())
|
|
require.NoError(t, err)
|
|
require.NoError(t, conn.Close())
|
|
require.True(t, called, "expected TLSWrapper to be called")
|
|
}
|
|
|
|
// TODO: integration test TestNewDialer with TLS and rcp server, when the rpc
|
|
// exists as an isolated component.
|
|
|
|
func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
|
|
count := 4
|
|
cfg := resolver.Config{Scheme: newScheme(t.Name())}
|
|
res := resolver.NewServerResolverBuilder(cfg)
|
|
resolver.RegisterWithGRPC(res)
|
|
pool := NewClientConnPool(res, nil)
|
|
|
|
for i := 0; i < count; i++ {
|
|
name := fmt.Sprintf("server-%d", i)
|
|
srv := newTestServer(t, name, "dc1")
|
|
res.AddServer(srv.Metadata())
|
|
t.Cleanup(srv.shutdown)
|
|
}
|
|
|
|
conn, err := pool.ClientConn("dc1")
|
|
require.NoError(t, err)
|
|
client := testservice.NewSimpleClient(conn)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
t.Cleanup(cancel)
|
|
|
|
first, err := client.Something(ctx, &testservice.Req{})
|
|
require.NoError(t, err)
|
|
|
|
res.RemoveServer(&metadata.Server{ID: first.ServerName, Datacenter: "dc1"})
|
|
|
|
resp, err := client.Something(ctx, &testservice.Req{})
|
|
require.NoError(t, err)
|
|
require.NotEqual(t, resp.ServerName, first.ServerName)
|
|
}
|
|
|
|
func newScheme(n string) string {
|
|
s := strings.Replace(n, "/", "", -1)
|
|
s = strings.Replace(s, "_", "", -1)
|
|
return strings.ToLower(s)
|
|
}
|
|
|
|
func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
|
|
count := 4
|
|
cfg := resolver.Config{Scheme: newScheme(t.Name())}
|
|
res := resolver.NewServerResolverBuilder(cfg)
|
|
resolver.RegisterWithGRPC(res)
|
|
pool := NewClientConnPool(res, nil)
|
|
|
|
for i := 0; i < count; i++ {
|
|
name := fmt.Sprintf("server-%d", i)
|
|
srv := newTestServer(t, name, "dc1")
|
|
res.AddServer(srv.Metadata())
|
|
t.Cleanup(srv.shutdown)
|
|
}
|
|
|
|
conn, err := pool.ClientConn("dc1")
|
|
require.NoError(t, err)
|
|
client := testservice.NewSimpleClient(conn)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
t.Cleanup(cancel)
|
|
|
|
first, err := client.Something(ctx, &testservice.Req{})
|
|
require.NoError(t, err)
|
|
|
|
t.Run("rebalance a different DC, does nothing", func(t *testing.T) {
|
|
res.NewRebalancer("dc-other")()
|
|
|
|
resp, err := client.Something(ctx, &testservice.Req{})
|
|
require.NoError(t, err)
|
|
require.Equal(t, resp.ServerName, first.ServerName)
|
|
})
|
|
|
|
t.Run("rebalance the dc", func(t *testing.T) {
|
|
// Rebalance is random, but if we repeat it a few times it should give us a
|
|
// new server.
|
|
retry.RunWith(fastRetry, t, func(r *retry.R) {
|
|
res.NewRebalancer("dc1")()
|
|
|
|
resp, err := client.Something(ctx, &testservice.Req{})
|
|
require.NoError(r, err)
|
|
require.NotEqual(r, resp.ServerName, first.ServerName)
|
|
})
|
|
})
|
|
}
|
|
|
|
func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
|
|
dcs := []string{"dc1", "dc2", "dc3"}
|
|
|
|
cfg := resolver.Config{Scheme: newScheme(t.Name())}
|
|
res := resolver.NewServerResolverBuilder(cfg)
|
|
resolver.RegisterWithGRPC(res)
|
|
pool := NewClientConnPool(res, nil)
|
|
|
|
for _, dc := range dcs {
|
|
name := "server-0-" + dc
|
|
srv := newTestServer(t, name, dc)
|
|
res.AddServer(srv.Metadata())
|
|
t.Cleanup(srv.shutdown)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
t.Cleanup(cancel)
|
|
|
|
for _, dc := range dcs {
|
|
conn, err := pool.ClientConn(dc)
|
|
require.NoError(t, err)
|
|
client := testservice.NewSimpleClient(conn)
|
|
|
|
resp, err := client.Something(ctx, &testservice.Req{})
|
|
require.NoError(t, err)
|
|
require.Equal(t, resp.Datacenter, dc)
|
|
}
|
|
}
|