consul/agent/rpc/peering/subscription_state_test.go
R.B. Boyer 1a8834e1c8
peering: replicate expected SNI, SPIFFE, and service protocol to peers (#13218)
The importing peer will need to know what SNI and SPIFFE name
corresponds to each exported service. Additionally it will need to know
at a high level the protocol in use (L4/L7) to generate the appropriate
connection pool and local metrics.

For replicated connect synthetic entities we edit the `Connect{}` part
of a `NodeService` to have a new section:

    {
      "PeerMeta": {
        "SNI": [
          "web.default.default.owt.external.183150d5-1033-3672-c426-c29205a576b8.consul"
        ],
        "SpiffeID": [
          "spiffe://183150d5-1033-3672-c426-c29205a576b8.consul/ns/default/dc/dc1/svc/web"
        ],
        "Protocol": "tcp"
      }
    }

This data is then replicated and saved as-is at the importing side. Both
SNI and SpiffeID are slices for now until I can be sure we don't need
them for how mesh gateways will ultimately work.
2022-05-25 12:37:44 -05:00

201 lines
5.0 KiB
Go

package peering
import (
"context"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestSubscriptionState_Events(t *testing.T) {
logger := hclog.NewNullLogger()
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
state := newSubscriptionState("my-peering", partition)
testutil.RunStep(t, "empty", func(t *testing.T) {
pending := &pendingPayload{}
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 0)
})
meshNode1 := &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "foo"},
Service: &pbservice.NodeService{ID: "mgw-1", Service: "mgw", Kind: "mesh-gateway"},
}
testutil.RunStep(t, "one", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 1)
evt := got[0]
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
})
testutil.RunStep(t, "a duplicate is omitted", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 0)
})
webNode1 := &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "zim"},
Service: &pbservice.NodeService{ID: "web-1", Service: "web"},
}
webSN := structs.NewServiceName("web", nil)
testutil.RunStep(t, "a duplicate is omitted even if mixed", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
},
},
))
require.NoError(t, pending.Add(
servicePayloadIDPrefix+webSN.String(),
subExportedService+webSN.String(),
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(webNode1).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 1)
evt := got[0]
require.Equal(t, subExportedService+webSN.String(), evt.CorrelationID)
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 1)
})
meshNode2 := &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "bar"},
Service: &pbservice.NodeService{ID: "mgw-2", Service: "mgw", Kind: "mesh-gateway"},
}
testutil.RunStep(t, "an update to an existing item is published", func(t *testing.T) {
pending := &pendingPayload{}
require.NoError(t, pending.Add(
meshGatewayPayloadID,
subMeshGateway+partition,
&pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{
proto.Clone(meshNode1).(*pbservice.CheckServiceNode),
proto.Clone(meshNode2).(*pbservice.CheckServiceNode),
},
},
))
ch := make(chan cache.UpdateEvent, 1)
state.publicUpdateCh = ch
go func() {
state.sendPendingEvents(context.Background(), logger, pending)
close(ch)
}()
got := drainEvents(t, ch)
require.Len(t, got, 1)
evt := got[0]
require.Equal(t, subMeshGateway+partition, evt.CorrelationID)
require.Len(t, evt.Result.(*pbservice.IndexedCheckServiceNodes).Nodes, 2)
})
}
func drainEvents(t *testing.T, ch <-chan cache.UpdateEvent) []cache.UpdateEvent {
var out []cache.UpdateEvent
for {
select {
case evt, ok := <-ch:
if !ok {
return out
}
out = append(out, evt)
case <-time.After(100 * time.Millisecond):
t.Fatalf("channel did not close in time")
}
}
}
func testNewSubscriptionState(partition string) (
*subscriptionState,
chan cache.UpdateEvent,
) {
var (
publicUpdateCh = make(chan cache.UpdateEvent, 1)
)
state := newSubscriptionState("my-peering", partition)
state.publicUpdateCh = publicUpdateCh
return state, publicUpdateCh
}