peering, internal: support UIServices, UINodes, UINodeInfo (#13577)

This commit is contained in:
alex 2022-06-24 15:17:35 -07:00 committed by GitHub
parent 0b3f90c3e7
commit 53f0cf5835
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 566 additions and 180 deletions

View File

@ -69,18 +69,60 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
index, dump, err := state.NodeDump(ws, &args.EnterpriseMeta, args.PeerName) // we don't support calling this endpoint for a specific peer
if err != nil { if args.PeerName != "" {
return err return fmt.Errorf("this endpoint does not support specifying a peer: %q", args.PeerName)
} }
reply.Index, reply.Dump = index, dump
// this maxIndex will be the max of the NodeDump calls and the PeeringList call
var maxIndex uint64
// Get data for local nodes
index, dump, err := state.NodeDump(ws, &args.EnterpriseMeta, structs.DefaultPeerKeyword)
if err != nil {
return fmt.Errorf("could not get a node dump for local nodes: %w", err)
}
if index > maxIndex {
maxIndex = index
}
reply.Dump = dump
// get a list of all peerings
index, listedPeerings, err := state.PeeringList(ws, args.EnterpriseMeta)
if err != nil {
return fmt.Errorf("could not list peers for node dump %w", err)
}
if index > maxIndex {
maxIndex = index
}
// get node dumps for all peerings
for _, p := range listedPeerings {
index, importedDump, err := state.NodeDump(ws, &args.EnterpriseMeta, p.Name)
if err != nil {
return fmt.Errorf("could not get a node dump for peer %q: %w", p.Name, err)
}
reply.ImportedDump = append(reply.ImportedDump, importedDump...)
if index > maxIndex {
maxIndex = index
}
}
reply.Index = maxIndex
raw, err := filter.Execute(reply.Dump) raw, err := filter.Execute(reply.Dump)
if err != nil { if err != nil {
return err return fmt.Errorf("could not filter local node dump: %w", err)
} }
reply.Dump = raw.(structs.NodeDump) reply.Dump = raw.(structs.NodeDump)
importedRaw, err := filter.Execute(reply.ImportedDump)
if err != nil {
return fmt.Errorf("could not filter peer node dump: %w", err)
}
reply.ImportedDump = importedRaw.(structs.NodeDump)
// Note: we filter the results with ACLs *after* applying the user-supplied // Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission. // results that would be filtered out even if the user did have permission.
@ -111,13 +153,47 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
// Get, store, and filter nodes // we don't support calling this endpoint for a specific peer
maxIdx, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, args.PeerName) if args.PeerName != "" {
return fmt.Errorf("this endpoint does not support specifying a peer: %q", args.PeerName)
}
// this maxIndex will be the max of the ServiceDump calls and the PeeringList call
var maxIndex uint64
// get a local dump for services
index, nodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, structs.DefaultPeerKeyword)
if err != nil { if err != nil {
return err return fmt.Errorf("could not get a service dump for local nodes: %w", err)
}
if index > maxIndex {
maxIndex = index
} }
reply.Nodes = nodes reply.Nodes = nodes
// get a list of all peerings
index, listedPeerings, err := state.PeeringList(ws, args.EnterpriseMeta)
if err != nil {
return fmt.Errorf("could not list peers for service dump %w", err)
}
if index > maxIndex {
maxIndex = index
}
for _, p := range listedPeerings {
index, importedNodes, err := state.ServiceDump(ws, args.ServiceKind, args.UseServiceKind, &args.EnterpriseMeta, p.Name)
if err != nil {
return fmt.Errorf("could not get a service dump for peer %q: %w", p.Name, err)
}
if index > maxIndex {
maxIndex = index
}
reply.ImportedNodes = append(reply.ImportedNodes, importedNodes...)
}
// Get, store, and filter gateway services // Get, store, and filter gateway services
idx, gatewayServices, err := state.DumpGatewayServices(ws) idx, gatewayServices, err := state.DumpGatewayServices(ws)
if err != nil { if err != nil {
@ -125,17 +201,23 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
} }
reply.Gateways = gatewayServices reply.Gateways = gatewayServices
if idx > maxIdx { if idx > maxIndex {
maxIdx = idx maxIndex = idx
} }
reply.Index = maxIdx reply.Index = maxIndex
raw, err := filter.Execute(reply.Nodes) raw, err := filter.Execute(reply.Nodes)
if err != nil { if err != nil {
return err return fmt.Errorf("could not filter local service dump: %w", err)
} }
reply.Nodes = raw.(structs.CheckServiceNodes) reply.Nodes = raw.(structs.CheckServiceNodes)
importedRaw, err := filter.Execute(reply.ImportedNodes)
if err != nil {
return fmt.Errorf("could not filter peer service dump: %w", err)
}
reply.ImportedNodes = importedRaw.(structs.CheckServiceNodes)
// Note: we filter the results with ACLs *after* applying the user-supplied // Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
// results that would be filtered out even if the user did have permission. // results that would be filtered out even if the user did have permission.

View File

@ -8,6 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -17,6 +18,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
@ -29,15 +31,13 @@ func TestInternal_NodeInfo(t *testing.T) {
} }
t.Parallel() t.Parallel()
dir1, s1 := testServer(t) _, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1) codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{ args := []*structs.RegisterRequest{
{
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
@ -51,22 +51,31 @@ func TestInternal_NodeInfo(t *testing.T) {
Status: api.HealthPassing, Status: api.HealthPassing,
ServiceID: "db", ServiceID: "db",
}, },
} },
var out struct{} {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { Datacenter: "dc1",
t.Fatalf("err: %v", err) Node: "foo",
Address: "127.0.0.3",
PeerName: "peer1",
},
} }
var out2 structs.IndexedNodeDump for _, reg := range args {
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil)
require.NoError(t, err)
}
t.Run("get local node", func(t *testing.T) {
var out structs.IndexedNodeDump
req := structs.NodeSpecificRequest{ req := structs.NodeSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
} }
if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out2); err != nil { if err := msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
nodes := out2.Dump nodes := out.Dump
if len(nodes) != 1 { if len(nodes) != 1 {
t.Fatalf("Bad: %v", nodes) t.Fatalf("Bad: %v", nodes)
} }
@ -79,6 +88,22 @@ func TestInternal_NodeInfo(t *testing.T) {
if nodes[0].Checks[0].Status != api.HealthPassing { if nodes[0].Checks[0].Status != api.HealthPassing {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
})
t.Run("get peered node", func(t *testing.T) {
var out structs.IndexedNodeDump
req := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "foo",
PeerName: "peer1",
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeInfo", &req, &out))
nodes := out.Dump
require.Equal(t, 1, len(nodes))
require.Equal(t, "foo", nodes[0].Node)
require.Equal(t, "peer1", nodes[0].PeerName)
})
} }
func TestInternal_NodeDump(t *testing.T) { func TestInternal_NodeDump(t *testing.T) {
@ -87,15 +112,13 @@ func TestInternal_NodeDump(t *testing.T) {
} }
t.Parallel() t.Parallel()
dir1, s1 := testServer(t) _, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1) codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{ args := []*structs.RegisterRequest{
{
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
@ -109,13 +132,8 @@ func TestInternal_NodeDump(t *testing.T) {
Status: api.HealthPassing, Status: api.HealthPassing,
ServiceID: "db", ServiceID: "db",
}, },
} },
var out struct{} {
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg = structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "bar", Node: "bar",
Address: "127.0.0.2", Address: "127.0.0.2",
@ -129,11 +147,26 @@ func TestInternal_NodeDump(t *testing.T) {
Status: api.HealthWarning, Status: api.HealthWarning,
ServiceID: "db", ServiceID: "db",
}, },
},
{
Datacenter: "dc1",
Node: "foo-peer",
Address: "127.0.0.3",
PeerName: "peer1",
},
} }
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err) for _, reg := range args {
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil)
require.NoError(t, err)
} }
err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
Name: "peer1",
})
require.NoError(t, err)
var out2 structs.IndexedNodeDump var out2 structs.IndexedNodeDump
req := structs.DCSpecificRequest{ req := structs.DCSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -175,6 +208,10 @@ func TestInternal_NodeDump(t *testing.T) {
if !foundFoo || !foundBar { if !foundFoo || !foundBar {
t.Fatalf("missing foo or bar") t.Fatalf("missing foo or bar")
} }
require.Len(t, out2.ImportedDump, 1)
require.Equal(t, "peer1", out2.ImportedDump[0].PeerName)
require.Equal(t, "foo-peer", out2.ImportedDump[0].Node)
} }
func TestInternal_NodeDump_Filter(t *testing.T) { func TestInternal_NodeDump_Filter(t *testing.T) {
@ -183,15 +220,13 @@ func TestInternal_NodeDump_Filter(t *testing.T) {
} }
t.Parallel() t.Parallel()
dir1, s1 := testServer(t) _, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1) codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{ args := []*structs.RegisterRequest{
{
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
@ -205,11 +240,8 @@ func TestInternal_NodeDump_Filter(t *testing.T) {
Status: api.HealthPassing, Status: api.HealthPassing,
ServiceID: "db", ServiceID: "db",
}, },
} },
var out struct{} {
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "bar", Node: "bar",
Address: "127.0.0.2", Address: "127.0.0.2",
@ -223,10 +255,27 @@ func TestInternal_NodeDump_Filter(t *testing.T) {
Status: api.HealthWarning, Status: api.HealthWarning,
ServiceID: "db", ServiceID: "db",
}, },
},
{
Datacenter: "dc1",
Node: "foo-peer",
Address: "127.0.0.3",
PeerName: "peer1",
},
} }
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) for _, reg := range args {
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", reg, nil)
require.NoError(t, err)
}
err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
Name: "peer1",
})
require.NoError(t, err)
t.Run("filter on the local node", func(t *testing.T) {
var out2 structs.IndexedNodeDump var out2 structs.IndexedNodeDump
req := structs.DCSpecificRequest{ req := structs.DCSpecificRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -237,6 +286,41 @@ func TestInternal_NodeDump_Filter(t *testing.T) {
nodes := out2.Dump nodes := out2.Dump
require.Len(t, nodes, 1) require.Len(t, nodes, 1)
require.Equal(t, "foo", nodes[0].Node) require.Equal(t, "foo", nodes[0].Node)
})
t.Run("filter on imported dump", func(t *testing.T) {
var out3 structs.IndexedNodeDump
req2 := structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Filter: "friend in PeerName"},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3))
require.Len(t, out3.Dump, 0)
require.Len(t, out3.ImportedDump, 0)
})
t.Run("filter look for peer nodes (non local nodes)", func(t *testing.T) {
var out3 structs.IndexedNodeDump
req2 := structs.DCSpecificRequest{
QueryOptions: structs.QueryOptions{Filter: "PeerName != \"\""},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3))
require.Len(t, out3.Dump, 0)
require.Len(t, out3.ImportedDump, 1)
})
t.Run("filter look for a specific peer", func(t *testing.T) {
var out3 structs.IndexedNodeDump
req2 := structs.DCSpecificRequest{
QueryOptions: structs.QueryOptions{Filter: "PeerName == peer1"},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.NodeDump", &req2, &out3))
require.Len(t, out3.Dump, 0)
require.Len(t, out3.ImportedDump, 1)
})
} }
func TestInternal_KeyringOperation(t *testing.T) { func TestInternal_KeyringOperation(t *testing.T) {
@ -1665,6 +1749,89 @@ func TestInternal_GatewayServiceDump_Ingress_ACL(t *testing.T) {
require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning) require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning)
} }
func TestInternal_ServiceDump_Peering(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s1 := testServer(t)
codec := rpcClient(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// prep the cluster with some data we can use in our filters
registerTestCatalogEntries(t, codec)
doRequest := func(t *testing.T, filter string) structs.IndexedNodesWithGateways {
t.Helper()
args := structs.DCSpecificRequest{
QueryOptions: structs.QueryOptions{Filter: filter},
}
var out structs.IndexedNodesWithGateways
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceDump", &args, &out))
return out
}
t.Run("No peerings", func(t *testing.T) {
nodes := doRequest(t, "")
// redis (3), web (3), critical (1), warning (1) and consul (1)
require.Len(t, nodes.Nodes, 9)
require.Len(t, nodes.ImportedNodes, 0)
})
addPeerService(t, codec)
err := s1.fsm.State().PeeringWrite(1, &pbpeering.Peering{
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
Name: "peer1",
})
require.NoError(t, err)
t.Run("peerings", func(t *testing.T) {
nodes := doRequest(t, "")
// redis (3), web (3), critical (1), warning (1) and consul (1)
require.Len(t, nodes.Nodes, 9)
// service (1)
require.Len(t, nodes.ImportedNodes, 1)
})
t.Run("peerings w filter", func(t *testing.T) {
nodes := doRequest(t, "Node.PeerName == foo")
require.Len(t, nodes.Nodes, 0)
require.Len(t, nodes.ImportedNodes, 0)
nodes2 := doRequest(t, "Node.PeerName == peer1")
require.Len(t, nodes2.Nodes, 0)
require.Len(t, nodes2.ImportedNodes, 1)
})
}
func addPeerService(t *testing.T, codec rpc.ClientCodec) {
// prep the cluster with some data we can use in our filters
registrations := map[string]*structs.RegisterRequest{
"Peer node foo with peer service": {
Datacenter: "dc1",
Node: "foo",
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
Address: "127.0.0.2",
PeerName: "peer1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "serviceID",
Service: "service",
Port: 1235,
Address: "198.18.1.2",
PeerName: "peer1",
},
},
}
registerTestCatalogEntriesMap(t, codec, registrations)
}
func TestInternal_GatewayIntentions(t *testing.T) { func TestInternal_GatewayIntentions(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")

View File

@ -2239,6 +2239,7 @@ type IndexedCheckServiceNodes struct {
} }
type IndexedNodesWithGateways struct { type IndexedNodesWithGateways struct {
ImportedNodes CheckServiceNodes
Nodes CheckServiceNodes Nodes CheckServiceNodes
Gateways GatewayServices Gateways GatewayServices
QueryMeta QueryMeta
@ -2250,6 +2251,7 @@ type DatacenterIndexedCheckServiceNodes struct {
} }
type IndexedNodeDump struct { type IndexedNodeDump struct {
ImportedDump NodeDump
Dump NodeDump Dump NodeDump
QueryMeta QueryMeta
} }

View File

@ -37,6 +37,8 @@ type ServiceSummary struct {
transparentProxySet bool transparentProxySet bool
ConnectNative bool ConnectNative bool
PeerName string `json:",omitempty"`
acl.EnterpriseMeta acl.EnterpriseMeta
} }
@ -117,7 +119,18 @@ RPC:
if out.Dump == nil { if out.Dump == nil {
out.Dump = make(structs.NodeDump, 0) out.Dump = make(structs.NodeDump, 0)
} }
return out.Dump, nil
// Use empty list instead of nil
for _, info := range out.ImportedDump {
if info.Services == nil {
info.Services = make([]*structs.NodeService, 0)
}
if info.Checks == nil {
info.Checks = make([]*structs.HealthCheck, 0)
}
}
return append(out.Dump, out.ImportedDump...), nil
} }
// UINodeInfo is used to get info on a single node in a given datacenter. We return a // UINodeInfo is used to get info on a single node in a given datacenter. We return a
@ -139,6 +152,10 @@ func (s *HTTPHandlers) UINodeInfo(resp http.ResponseWriter, req *http.Request) (
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"} return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"}
} }
if peer := req.URL.Query().Get("peer"); peer != "" {
args.PeerName = peer
}
// Make the RPC request // Make the RPC request
var out structs.IndexedNodeDump var out structs.IndexedNodeDump
defer setMeta(resp, &out.QueryMeta) defer setMeta(resp, &out.QueryMeta)
@ -216,15 +233,17 @@ RPC:
// Store the names of the gateways associated with each service // Store the names of the gateways associated with each service
var ( var (
serviceGateways = make(map[structs.ServiceName][]structs.ServiceName) serviceGateways = make(map[structs.PeeredServiceName][]structs.PeeredServiceName)
numLinkedServices = make(map[structs.ServiceName]int) numLinkedServices = make(map[structs.PeeredServiceName]int)
) )
for _, gs := range out.Gateways { for _, gs := range out.Gateways {
serviceGateways[gs.Service] = append(serviceGateways[gs.Service], gs.Gateway) psn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: gs.Service}
numLinkedServices[gs.Gateway] += 1 gpsn := structs.PeeredServiceName{Peer: structs.DefaultPeerKeyword, ServiceName: gs.Gateway}
serviceGateways[psn] = append(serviceGateways[psn], gpsn)
numLinkedServices[gpsn] += 1
} }
summaries, hasProxy := summarizeServices(out.Nodes.ToServiceDump(), nil, "") summaries, hasProxy := summarizeServices(append(out.Nodes, out.ImportedNodes...).ToServiceDump(), nil, "")
sorted := prepSummaryOutput(summaries, false) sorted := prepSummaryOutput(summaries, false)
// Ensure at least a zero length slice // Ensure at least a zero length slice
@ -233,17 +252,18 @@ RPC:
sum := ServiceListingSummary{ServiceSummary: *svc} sum := ServiceListingSummary{ServiceSummary: *svc}
sn := structs.NewServiceName(svc.Name, &svc.EnterpriseMeta) sn := structs.NewServiceName(svc.Name, &svc.EnterpriseMeta)
if hasProxy[sn] { psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
if hasProxy[psn] {
sum.ConnectedWithProxy = true sum.ConnectedWithProxy = true
} }
// Verify that at least one of the gateways linked by config entry has an instance registered in the catalog // Verify that at least one of the gateways linked by config entry has an instance registered in the catalog
for _, gw := range serviceGateways[sn] { for _, gw := range serviceGateways[psn] {
if s := summaries[gw]; s != nil && sum.InstanceCount > 0 { if s := summaries[gw]; s != nil && sum.InstanceCount > 0 {
sum.ConnectedWithGateway = true sum.ConnectedWithGateway = true
} }
} }
sum.GatewayConfig.AssociatedServiceCount = numLinkedServices[sn] sum.GatewayConfig.AssociatedServiceCount = numLinkedServices[psn]
result = append(result, &sum) result = append(result, &sum)
} }
@ -389,31 +409,43 @@ RPC:
return topo, nil return topo, nil
} }
func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc string) (map[structs.ServiceName]*ServiceSummary, map[structs.ServiceName]bool) { func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc string) (map[structs.PeeredServiceName]*ServiceSummary, map[structs.PeeredServiceName]bool) {
var ( var (
summary = make(map[structs.ServiceName]*ServiceSummary) summary = make(map[structs.PeeredServiceName]*ServiceSummary)
hasProxy = make(map[structs.ServiceName]bool) hasProxy = make(map[structs.PeeredServiceName]bool)
) )
getService := func(service structs.ServiceName) *ServiceSummary { getService := func(psn structs.PeeredServiceName) *ServiceSummary {
serv, ok := summary[service] serv, ok := summary[psn]
if !ok { if !ok {
serv = &ServiceSummary{ serv = &ServiceSummary{
Name: service.Name, Name: psn.ServiceName.Name,
EnterpriseMeta: service.EnterpriseMeta, EnterpriseMeta: psn.ServiceName.EnterpriseMeta,
// the other code will increment this unconditionally so we // the other code will increment this unconditionally so we
// shouldn't initialize it to 1 // shouldn't initialize it to 1
InstanceCount: 0, InstanceCount: 0,
PeerName: psn.Peer,
} }
summary[service] = serv summary[psn] = serv
} }
return serv return serv
} }
for _, csn := range dump { for _, csn := range dump {
var peerName string
// all entities will have the same peer name so it is safe to use the node's peer name
if csn.Node == nil {
// this can happen for gateway dumps that call this summarize func
peerName = structs.DefaultPeerKeyword
} else {
peerName = csn.Node.PeerName
}
if cfg != nil && csn.GatewayService != nil { if cfg != nil && csn.GatewayService != nil {
gwsvc := csn.GatewayService gwsvc := csn.GatewayService
sum := getService(gwsvc.Service)
psn := structs.PeeredServiceName{Peer: peerName, ServiceName: gwsvc.Service}
sum := getService(psn)
modifySummaryForGatewayService(cfg, dc, sum, gwsvc) modifySummaryForGatewayService(cfg, dc, sum, gwsvc)
} }
@ -421,8 +453,10 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s
if csn.Service == nil { if csn.Service == nil {
continue continue
} }
sn := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta) sn := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta)
sum := getService(sn) psn := structs.PeeredServiceName{Peer: peerName, ServiceName: sn}
sum := getService(psn)
svc := csn.Service svc := csn.Service
sum.Nodes = append(sum.Nodes, csn.Node.Node) sum.Nodes = append(sum.Nodes, csn.Node.Node)
@ -432,9 +466,10 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s
sum.ConnectNative = svc.Connect.Native sum.ConnectNative = svc.Connect.Native
if svc.Kind == structs.ServiceKindConnectProxy { if svc.Kind == structs.ServiceKindConnectProxy {
sn := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta) sn := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)
hasProxy[sn] = true psn := structs.PeeredServiceName{Peer: peerName, ServiceName: sn}
hasProxy[psn] = true
destination := getService(sn) destination := getService(psn)
for _, check := range csn.Checks { for _, check := range csn.Checks {
cid := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta) cid := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
uid := structs.UniqueID(csn.Node.Node, cid.String()) uid := structs.UniqueID(csn.Node.Node, cid.String())
@ -496,7 +531,7 @@ func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc s
return summary, hasProxy return summary, hasProxy
} }
func prepSummaryOutput(summaries map[structs.ServiceName]*ServiceSummary, excludeSidecars bool) []*ServiceSummary { func prepSummaryOutput(summaries map[structs.PeeredServiceName]*ServiceSummary, excludeSidecars bool) []*ServiceSummary {
var resp []*ServiceSummary var resp []*ServiceSummary
// Ensure at least a zero length slice // Ensure at least a zero length slice
resp = make([]*ServiceSummary, 0) resp = make([]*ServiceSummary, 0)

View File

@ -2,6 +2,7 @@ package agent
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -11,6 +12,7 @@ import (
"path/filepath" "path/filepath"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time"
cleanhttp "github.com/hashicorp/go-cleanhttp" cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -19,12 +21,14 @@ import (
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
) )
func TestUiIndex(t *testing.T) { func TestUIIndex(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
@ -74,7 +78,7 @@ func TestUiIndex(t *testing.T) {
} }
} }
func TestUiNodes(t *testing.T) { func TestUINodes(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
@ -84,15 +88,42 @@ func TestUiNodes(t *testing.T) {
defer a.Shutdown() defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1") testrpc.WaitForTestAgent(t, a.RPC, "dc1")
args := &structs.RegisterRequest{ args := []*structs.RegisterRequest{
{
Datacenter: "dc1", Datacenter: "dc1",
Node: "test", Node: "test",
Address: "127.0.0.1", Address: "127.0.0.1",
},
{
Datacenter: "dc1",
Node: "foo-peer",
Address: "127.0.0.3",
PeerName: "peer1",
},
} }
for _, reg := range args {
var out struct{} var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil { err := a.RPC("Catalog.Register", reg, &out)
t.Fatalf("err: %v", err) require.NoError(t, err)
}
// establish "peer1"
{
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
peerOne := &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "peer1",
State: pbpeering.PeeringState_INITIAL,
PeerCAPems: nil,
PeerServerName: "fooservername",
PeerServerAddresses: []string{"addr1"},
},
}
_, err := a.rpcClientPeering.PeeringWrite(ctx, peerOne)
require.NoError(t, err)
} }
req, _ := http.NewRequest("GET", "/v1/internal/ui/nodes/dc1", nil) req, _ := http.NewRequest("GET", "/v1/internal/ui/nodes/dc1", nil)
@ -103,20 +134,32 @@ func TestUiNodes(t *testing.T) {
} }
assertIndex(t, resp) assertIndex(t, resp)
// Should be 2 nodes, and all the empty lists should be non-nil // Should be 3 nodes, and all the empty lists should be non-nil
nodes := obj.(structs.NodeDump) nodes := obj.(structs.NodeDump)
if len(nodes) != 2 || require.Len(t, nodes, 3)
nodes[0].Node != a.Config.NodeName ||
nodes[0].Services == nil || len(nodes[0].Services) != 1 || // check local nodes, services and checks
nodes[0].Checks == nil || len(nodes[0].Checks) != 1 || require.Equal(t, a.Config.NodeName, nodes[0].Node)
nodes[1].Node != "test" || require.NotNil(t, nodes[0].Services)
nodes[1].Services == nil || len(nodes[1].Services) != 0 || require.Len(t, nodes[0].Services, 1)
nodes[1].Checks == nil || len(nodes[1].Checks) != 0 { require.NotNil(t, nodes[0].Checks)
t.Fatalf("bad: %v", obj) require.Len(t, nodes[0].Checks, 1)
} require.Equal(t, "test", nodes[1].Node)
require.NotNil(t, nodes[1].Services)
require.Len(t, nodes[1].Services, 0)
require.NotNil(t, nodes[1].Checks)
require.Len(t, nodes[1].Checks, 0)
// peered node
require.Equal(t, "foo-peer", nodes[2].Node)
require.Equal(t, "peer1", nodes[2].PeerName)
require.NotNil(t, nodes[2].Services)
require.Len(t, nodes[2].Services, 0)
require.NotNil(t, nodes[1].Checks)
require.Len(t, nodes[2].Services, 0)
} }
func TestUiNodes_Filter(t *testing.T) { func TestUINodes_Filter(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
@ -162,7 +205,7 @@ func TestUiNodes_Filter(t *testing.T) {
require.Empty(t, nodes[0].Checks) require.Empty(t, nodes[0].Checks)
} }
func TestUiNodeInfo(t *testing.T) { func TestUINodeInfo(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
@ -214,7 +257,7 @@ func TestUiNodeInfo(t *testing.T) {
} }
} }
func TestUiServices(t *testing.T) { func TestUIServices(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
@ -318,6 +361,30 @@ func TestUiServices(t *testing.T) {
Tags: []string{}, Tags: []string{},
}, },
}, },
// register peer node foo with peer service
{
Datacenter: "dc1",
Node: "foo",
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
Address: "127.0.0.2",
TaggedAddresses: map[string]string{
"lan": "127.0.0.2",
"wan": "198.18.0.2",
},
NodeMeta: map[string]string{
"env": "production",
"os": "linux",
},
PeerName: "peer1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "serviceID",
Service: "service",
Port: 1235,
Address: "198.18.1.2",
PeerName: "peer1",
},
},
} }
for _, args := range requests { for _, args := range requests {
@ -325,6 +392,24 @@ func TestUiServices(t *testing.T) {
require.NoError(t, a.RPC("Catalog.Register", args, &out)) require.NoError(t, a.RPC("Catalog.Register", args, &out))
} }
// establish "peer1"
{
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
peerOne := &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "peer1",
State: pbpeering.PeeringState_INITIAL,
PeerCAPems: nil,
PeerServerName: "fooservername",
PeerServerAddresses: []string{"addr1"},
},
}
_, err := a.rpcClientPeering.PeeringWrite(ctx, peerOne)
require.NoError(t, err)
}
// Register a terminating gateway associated with api and cache // Register a terminating gateway associated with api and cache
{ {
arg := structs.RegisterRequest{ arg := structs.RegisterRequest{
@ -393,7 +478,7 @@ func TestUiServices(t *testing.T) {
// Should be 2 nodes, and all the empty lists should be non-nil // Should be 2 nodes, and all the empty lists should be non-nil
summary := obj.([]*ServiceListingSummary) summary := obj.([]*ServiceListingSummary)
require.Len(t, summary, 6) require.Len(t, summary, 7)
// internal accounting that users don't see can be blown away // internal accounting that users don't see can be blown away
for _, sum := range summary { for _, sum := range summary {
@ -493,6 +578,21 @@ func TestUiServices(t *testing.T) {
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}, },
}, },
{
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTypical,
Name: "service",
Datacenter: "dc1",
Tags: nil,
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 0,
ChecksWarning: 0,
ChecksCritical: 0,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
PeerName: "peer1",
},
},
} }
require.ElementsMatch(t, expected, summary) require.ElementsMatch(t, expected, summary)
}) })