peering: add TrustBundleListByService endpoint (#13048)

This commit is contained in:
Evan Culver 2022-05-12 15:58:22 -07:00 committed by GitHub
parent 4e215dc411
commit 0fa5e7be5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 944 additions and 256 deletions

View File

@ -108,7 +108,9 @@ type Store interface {
PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error)
PeeringsForService(ws memdb.WatchSet, serviceName string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
AbandonCh() <-chan struct{}
}
@ -373,6 +375,51 @@ func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelet
return &pbpeering.PeeringDeleteResponse{}, nil
}
func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) {
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
var resp *pbpeering.TrustBundleListByServiceResponse
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
var err error
resp, err = pbpeering.NewPeeringServiceClient(conn).TrustBundleListByService(ctx, req)
return err
})
if handled || err != nil {
return resp, err
}
defer metrics.MeasureSince([]string{"peering", "trust_bundle_list_by_service"}, time.Now())
// TODO(peering): ACL check request token
// TODO(peering): handle blocking queries
entMeta := *structs.NodeEnterpriseMetaInPartition(req.Partition)
// TODO(peering): we're throwing away the index here that would tell us how to execute a blocking query
_, peers, err := s.Backend.Store().PeeringsForService(nil, req.ServiceName, entMeta)
if err != nil {
return nil, fmt.Errorf("failed to get peers for service %s: %v", req.ServiceName, err)
}
trustBundles := []*pbpeering.PeeringTrustBundle{}
for _, peer := range peers {
q := state.Query{
Value: strings.ToLower(peer.Name),
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(req.Partition),
}
_, trustBundle, err := s.Backend.Store().PeeringTrustBundleRead(nil, q)
if err != nil {
return nil, fmt.Errorf("failed to read trust bundle for peer %s: %v", peer.Name, err)
}
if trustBundle != nil {
trustBundles = append(trustBundles, trustBundle)
}
}
return &pbpeering.TrustBundleListByServiceResponse{Bundles: trustBundles}, nil
}
type BidirectionalStream interface {
Send(*pbpeering.ReplicationMessage) error
Recv() (*pbpeering.ReplicationMessage, error)

View File

@ -11,6 +11,8 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
@ -18,7 +20,6 @@ import (
"golang.org/x/sync/errgroup"
gogrpc "google.golang.org/grpc"
"github.com/hashicorp/consul/agent/consul/state"
grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
"github.com/hashicorp/consul/api"
@ -311,6 +312,359 @@ func TestPeeringService_List(t *testing.T) {
prototest.AssertDeepEqual(t, expect, resp)
}
func TestPeeringService_TrustBundleListByService(t *testing.T) {
// test executes the following scenario:
// 0 - initial setup test server, state store, RPC client, verify empty results
// 1 - create a service, verify results still empty
// 2 - create a peering, verify results still empty
// 3 - create a config entry, verify results still empty
// 4 - create trust bundles, verify bundles are returned
// 5 - delete the config entry, verify results empty
// 6 - restore config entry, verify bundles are returned
// 7 - add peering, trust bundles, wildcard config entry, verify updated results are present
// 8 - delete first config entry, verify bundles are returned
// 9 - delete the service, verify results empty
// Note: these steps are dependent on each other by design so that we can verify that
// combinations of services, peerings, trust bundles, and config entries all affect results
// fixed for the test
nodeName := "test-node"
// keep track of index across steps
var lastIdx uint64
// Create test server
// TODO(peering): see note on newTestServer, refactor to not use this
srv := newTestServer(t, nil)
store := srv.Server.FSM().State()
client := pbpeering.NewPeeringServiceClient(srv.ClientConn(t))
// Create a node up-front so that we can assign services to it if needed
svcNode := &structs.Node{Node: nodeName, Address: "127.0.0.1"}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, svcNode))
type testDeps struct {
services []string
peerings []*pbpeering.Peering
entries []*structs.ExportedServicesConfigEntry
bundles []*pbpeering.PeeringTrustBundle
}
setup := func(t *testing.T, idx uint64, deps testDeps) uint64 {
// Create any services (and node)
if len(deps.services) >= 0 {
svcNode := &structs.Node{Node: nodeName, Address: "127.0.0.1"}
idx++
require.NoError(t, store.EnsureNode(idx, svcNode))
// Create the test services
for _, svc := range deps.services {
idx++
require.NoError(t, store.EnsureService(idx, svcNode.Node, &structs.NodeService{
ID: svc,
Service: svc,
Port: int(8000 + idx),
}))
}
}
// Insert any peerings
for _, peering := range deps.peerings {
idx++
require.NoError(t, store.PeeringWrite(idx, peering))
// make sure it got created
q := state.Query{Value: peering.Name}
_, p, err := store.PeeringRead(nil, q)
require.NoError(t, err)
require.NotNil(t, p)
}
// Insert any trust bundles
for _, bundle := range deps.bundles {
idx++
require.NoError(t, store.PeeringTrustBundleWrite(idx, bundle))
q := state.Query{
Value: bundle.PeerName,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(bundle.Partition),
}
gotIdx, ptb, err := store.PeeringTrustBundleRead(nil, q)
require.NoError(t, err)
require.NotNil(t, ptb)
require.Equal(t, gotIdx, idx)
}
// Write any config entries
for _, entry := range deps.entries {
idx++
require.NoError(t, store.EnsureConfigEntry(idx, entry))
}
return idx
}
type testCase struct {
req *pbpeering.TrustBundleListByServiceRequest
expect *pbpeering.TrustBundleListByServiceResponse
expectErr string
}
// TODO(peering): see note on newTestServer, once we have a better server mock,
// we should add functionality here to verify errors from backend
verify := func(t *testing.T, tc *testCase) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
resp, err := client.TrustBundleListByService(ctx, tc.req)
require.NoError(t, err)
// ignore raft fields
if resp.Bundles != nil {
for _, b := range resp.Bundles {
b.CreateIndex = 0
b.ModifyIndex = 0
}
}
prototest.AssertDeepEqual(t, tc.expect, resp)
}
// Execute scenario steps
// ----------------------
// 0 - initial empty state
// -----------------------
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: nil,
},
})
// 1 - create a service, verify results still empty
// ------------------------------------------------
lastIdx = setup(t, lastIdx, testDeps{services: []string{"foo"}})
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{},
},
})
// 2 - create a peering, verify results still empty
// ------------------------------------------------
lastIdx = setup(t, lastIdx, testDeps{
peerings: []*pbpeering.Peering{
{
Name: "peer1",
State: pbpeering.PeeringState_ACTIVE,
PeerServerName: "peer1-name",
PeerServerAddresses: []string{"peer1-addr"},
},
},
})
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{},
},
})
// 3 - create a config entry, verify results still empty
// -----------------------------------------------------
lastIdx = setup(t, lastIdx, testDeps{
entries: []*structs.ExportedServicesConfigEntry{
{
Name: "export-foo",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "peer1",
},
},
},
},
},
},
})
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{},
},
})
// 4 - create trust bundles, verify bundles are returned
// -----------------------------------------------------
lastIdx = setup(t, lastIdx, testDeps{
bundles: []*pbpeering.PeeringTrustBundle{
{
TrustDomain: "peer1.com",
PeerName: "peer1",
RootPEMs: []string{"peer1-root-1"},
},
},
})
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{
{
TrustDomain: "peer1.com",
PeerName: "peer1",
RootPEMs: []string{"peer1-root-1"},
},
},
},
})
// 5 - delete the config entry, verify results empty
// -------------------------------------------------
lastIdx++
require.NoError(t, store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "export-foo", nil))
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{},
},
})
// 6 - restore config entry, verify bundles are returned
// -----------------------------------------------------
lastIdx = setup(t, lastIdx, testDeps{
entries: []*structs.ExportedServicesConfigEntry{
{
Name: "export-foo",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{
{PeerName: "peer1"},
},
},
},
},
},
})
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{
{
TrustDomain: "peer1.com",
PeerName: "peer1",
RootPEMs: []string{"peer1-root-1"},
},
},
},
})
// 7 - add peering, trust bundles, wildcard config entry, verify updated results are present
// -----------------------------------------------------------------------------------------
lastIdx = setup(t, lastIdx, testDeps{
services: []string{"bar"},
peerings: []*pbpeering.Peering{
{
Name: "peer2",
State: pbpeering.PeeringState_ACTIVE,
PeerServerName: "peer2-name",
PeerServerAddresses: []string{"peer2-addr"},
},
},
entries: []*structs.ExportedServicesConfigEntry{
{
Name: "export-all",
Services: []structs.ExportedService{
{
Name: structs.WildcardSpecifier,
Consumers: []structs.ServiceConsumer{
{PeerName: "peer1"},
{PeerName: "peer2"},
},
},
},
},
},
bundles: []*pbpeering.PeeringTrustBundle{
{
TrustDomain: "peer2.com",
PeerName: "peer2",
RootPEMs: []string{"peer2-root-1"},
},
},
})
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{
{
TrustDomain: "peer1.com",
PeerName: "peer1",
RootPEMs: []string{"peer1-root-1"},
},
{
TrustDomain: "peer2.com",
PeerName: "peer2",
RootPEMs: []string{"peer2-root-1"},
},
},
},
})
// 8 - delete first config entry, verify bundles are returned
lastIdx++
require.NoError(t, store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "export-foo", nil))
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{
{
TrustDomain: "peer1.com",
PeerName: "peer1",
RootPEMs: []string{"peer1-root-1"},
},
{
TrustDomain: "peer2.com",
PeerName: "peer2",
RootPEMs: []string{"peer2-root-1"},
},
},
},
})
// 9 - delete the service, verify results empty
lastIdx++
require.NoError(t, store.DeleteService(lastIdx, nodeName, "foo", nil, ""))
verify(t, &testCase{
req: &pbpeering.TrustBundleListByServiceRequest{
ServiceName: "foo",
},
expect: &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{},
},
})
}
func Test_StreamHandler_UpsertServices(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -107,6 +107,26 @@ func (msg *PeeringDeleteResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *TrustBundleListByServiceRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *TrustBundleListByServiceRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *TrustBundleListByServiceResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *TrustBundleListByServiceResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringTerminateByIDRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,9 @@ service PeeringService {
// Consider removing if we can find another way to populate state store in peering_endpoint_test.go
rpc PeeringWrite(PeeringWriteRequest) returns (PeeringWriteResponse);
// TODO(peering): Rename this to PeeredServiceRoots? or something like that?
rpc TrustBundleListByService(TrustBundleListByServiceRequest) returns (TrustBundleListByServiceResponse);
// StreamResources opens an event stream for resources to share between peers, such as services.
// Events are streamed as they happen.
rpc StreamResources(stream ReplicationMessage) returns (stream ReplicationMessage);
@ -165,6 +168,20 @@ message PeeringDeleteRequest {
message PeeringDeleteResponse {}
// @consul-rpc-glue: Datacenter,ReadTODO
message TrustBundleListByServiceRequest{
string ServiceName = 1;
string Partition = 2;
// these are common fields required for implementing structs.RPCInfo methods
// that are used to forward requests
string Datacenter = 3;
}
message TrustBundleListByServiceResponse{
repeated PeeringTrustBundle Bundles = 1;
}
message PeeringTerminateByIDRequest {
string ID = 1;
}

View File

@ -196,6 +196,55 @@ func (msg *PeeringDeleteRequest) TokenSecret() string {
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) RequestDatacenter() string {
if msg == nil {
return ""
}
return msg.Datacenter
}
// IsRead implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) IsRead() bool {
// TODO(peering): figure out read semantics here
return true
}
// AllowStaleRead implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) AllowStaleRead() bool {
// TODO(peering): figure out read semantics here
return false
}
// HasTimedOut implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) (bool, error) {
// TODO(peering): figure out read semantics here
return time.Since(start) > rpcHoldTimeout, nil
}
// Timeout implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
// TODO(peering): figure out read semantics here
return rpcHoldTimeout
}
// SetTokenSecret implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) SetTokenSecret(s string) {
// TODO(peering): figure out read semantics here
}
// TokenSecret implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) TokenSecret() string {
// TODO(peering): figure out read semantics here
return ""
}
// Token implements structs.RPCInfo
func (msg *TrustBundleListByServiceRequest) Token() string {
// TODO(peering): figure out read semantics here
return ""
}
// RequestDatacenter implements structs.RPCInfo
func (msg *PeeringTrustBundleWriteRequest) RequestDatacenter() string {
if msg == nil {