[OSS] Add grpc endpoint to fetch a specific trust bundle (#13292)

Co-authored-by: R.B. Boyer <rb@hashicorp.com>
This commit is contained in:
Freddy 2022-05-31 09:54:40 -06:00 committed by GitHub
parent 8e24a56134
commit 9427700270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 655 additions and 271 deletions

View File

@ -400,6 +400,40 @@ func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelet
return &pbpeering.PeeringDeleteResponse{}, nil return &pbpeering.PeeringDeleteResponse{}, nil
} }
func (s *Service) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error) {
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
var resp *pbpeering.TrustBundleReadResponse
handled, err := s.Backend.Forward(req, func(conn *grpc.ClientConn) error {
var err error
resp, err = pbpeering.NewPeeringServiceClient(conn).TrustBundleRead(ctx, req)
return err
})
if handled || err != nil {
return resp, err
}
defer metrics.MeasureSince([]string{"peering", "trust_bundle_read"}, time.Now())
// TODO(peering): ACL check request token
// TODO(peering): handle blocking queries
idx, trustBundle, err := s.Backend.Store().PeeringTrustBundleRead(nil, state.Query{
Value: req.Name,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(req.Partition),
})
if err != nil {
return nil, fmt.Errorf("failed to read trust bundle for peer %s: %w", req.Name, err)
}
return &pbpeering.TrustBundleReadResponse{
Index: idx,
Bundle: trustBundle,
}, nil
}
func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) { func (s *Service) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) {
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())

View File

@ -311,6 +311,53 @@ func TestPeeringService_List(t *testing.T) {
prototest.AssertDeepEqual(t, expect, resp) prototest.AssertDeepEqual(t, expect, resp)
} }
func TestPeeringService_TrustBundleRead(t *testing.T) {
srv := newTestServer(t, nil)
store := srv.Server.FSM().State()
client := pbpeering.NewPeeringServiceClient(srv.ClientConn(t))
var lastIdx uint64 = 1
_ = setupTestPeering(t, store, "my-peering", lastIdx)
mysql := &structs.CheckServiceNode{
Node: &structs.Node{
Node: "node1",
Address: "10.0.0.1",
PeerName: "my-peering",
},
Service: &structs.NodeService{
ID: "mysql-1",
Service: "mysql",
Port: 5000,
PeerName: "my-peering",
},
}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, mysql.Node.Node, mysql.Service))
bundle := &pbpeering.PeeringTrustBundle{
TrustDomain: "peer1.com",
PeerName: "my-peering",
RootPEMs: []string{"peer1-root-1"},
}
lastIdx++
require.NoError(t, store.PeeringTrustBundleWrite(lastIdx, bundle))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := client.TrustBundleRead(ctx, &pbpeering.TrustBundleReadRequest{
Name: "my-peering",
})
require.NoError(t, err)
require.Equal(t, lastIdx, resp.Index)
require.NotNil(t, resp.Bundle)
prototest.AssertDeepEqual(t, bundle, resp.Bundle)
}
func TestPeeringService_TrustBundleListByService(t *testing.T) { func TestPeeringService_TrustBundleListByService(t *testing.T) {
// test executes the following scenario: // test executes the following scenario:
// 0 - initial setup test server, state store, RPC client, verify empty results // 0 - initial setup test server, state store, RPC client, verify empty results
@ -1173,3 +1220,16 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps {
GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor, GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor,
} }
} }
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
err := store.PeeringWrite(index, &pbpeering.Peering{
Name: name,
})
require.NoError(t, err)
_, p, err := store.PeeringRead(nil, state.Query{Value: name})
require.NoError(t, err)
require.NotNil(t, p)
return p.ID
}

View File

@ -127,6 +127,26 @@ func (msg *TrustBundleListByServiceResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *TrustBundleReadRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *TrustBundleReadRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *TrustBundleReadResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *TrustBundleReadResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringTerminateByIDRequest) MarshalBinary() ([]byte, error) { func (msg *PeeringTerminateByIDRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,8 @@ service PeeringService {
// TODO(peering): Rename this to PeeredServiceRoots? or something like that? // TODO(peering): Rename this to PeeredServiceRoots? or something like that?
rpc TrustBundleListByService(TrustBundleListByServiceRequest) returns (TrustBundleListByServiceResponse); rpc TrustBundleListByService(TrustBundleListByServiceRequest) returns (TrustBundleListByServiceResponse);
rpc TrustBundleRead(TrustBundleReadRequest) returns (TrustBundleReadResponse);
// StreamResources opens an event stream for resources to share between peers, such as services. // StreamResources opens an event stream for resources to share between peers, such as services.
// Events are streamed as they happen. // Events are streamed as they happen.
// buf:lint:ignore RPC_REQUEST_STANDARD_NAME // buf:lint:ignore RPC_REQUEST_STANDARD_NAME
@ -190,6 +192,21 @@ message TrustBundleListByServiceResponse {
repeated PeeringTrustBundle Bundles = 1; repeated PeeringTrustBundle Bundles = 1;
} }
// @consul-rpc-glue: Datacenter,ReadTODO
message TrustBundleReadRequest {
string Name = 1;
string Partition = 2;
// these are common fields required for implementing structs.RPCInfo methods
// that are used to forward requests
string Datacenter = 3;
}
message TrustBundleReadResponse {
uint64 Index = 1;
PeeringTrustBundle Bundle = 2;
}
message PeeringTerminateByIDRequest { message PeeringTerminateByIDRequest {
string ID = 1; string ID = 1;
} }
@ -282,8 +299,7 @@ message InitiateRequest {
// target=github.com/hashicorp/consul/api.PeeringInitiateResponse // target=github.com/hashicorp/consul/api.PeeringInitiateResponse
// output=peering.gen.go // output=peering.gen.go
// name=API // name=API
message InitiateResponse { message InitiateResponse {}
}
message ReplicationMessage { message ReplicationMessage {
oneof Payload { oneof Payload {

View File

@ -245,6 +245,55 @@ func (msg *TrustBundleListByServiceRequest) Token() string {
return "" return ""
} }
// RequestDatacenter implements structs.RPCInfo
func (msg *TrustBundleReadRequest) RequestDatacenter() string {
if msg == nil {
return ""
}
return msg.Datacenter
}
// IsRead implements structs.RPCInfo
func (msg *TrustBundleReadRequest) IsRead() bool {
// TODO(peering): figure out read semantics here
return true
}
// AllowStaleRead implements structs.RPCInfo
func (msg *TrustBundleReadRequest) AllowStaleRead() bool {
// TODO(peering): figure out read semantics here
return false
}
// HasTimedOut implements structs.RPCInfo
func (msg *TrustBundleReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) (bool, error) {
// TODO(peering): figure out read semantics here
return time.Since(start) > rpcHoldTimeout, nil
}
// Timeout implements structs.RPCInfo
func (msg *TrustBundleReadRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
// TODO(peering): figure out read semantics here
return rpcHoldTimeout
}
// SetTokenSecret implements structs.RPCInfo
func (msg *TrustBundleReadRequest) SetTokenSecret(s string) {
// TODO(peering): figure out read semantics here
}
// TokenSecret implements structs.RPCInfo
func (msg *TrustBundleReadRequest) TokenSecret() string {
// TODO(peering): figure out read semantics here
return ""
}
// Token implements structs.RPCInfo
func (msg *TrustBundleReadRequest) Token() string {
// TODO(peering): figure out read semantics here
return ""
}
// RequestDatacenter implements structs.RPCInfo // RequestDatacenter implements structs.RPCInfo
func (msg *PeeringTrustBundleWriteRequest) RequestDatacenter() string { func (msg *PeeringTrustBundleWriteRequest) RequestDatacenter() string {
if msg == nil { if msg == nil {

View File

@ -32,6 +32,7 @@ type PeeringServiceClient interface {
PeeringWrite(ctx context.Context, in *PeeringWriteRequest, opts ...grpc.CallOption) (*PeeringWriteResponse, error) PeeringWrite(ctx context.Context, in *PeeringWriteRequest, opts ...grpc.CallOption) (*PeeringWriteResponse, error)
// TODO(peering): Rename this to PeeredServiceRoots? or something like that? // TODO(peering): Rename this to PeeredServiceRoots? or something like that?
TrustBundleListByService(ctx context.Context, in *TrustBundleListByServiceRequest, opts ...grpc.CallOption) (*TrustBundleListByServiceResponse, error) TrustBundleListByService(ctx context.Context, in *TrustBundleListByServiceRequest, opts ...grpc.CallOption) (*TrustBundleListByServiceResponse, error)
TrustBundleRead(ctx context.Context, in *TrustBundleReadRequest, opts ...grpc.CallOption) (*TrustBundleReadResponse, error)
// StreamResources opens an event stream for resources to share between peers, such as services. // StreamResources opens an event stream for resources to share between peers, such as services.
// Events are streamed as they happen. // Events are streamed as they happen.
// buf:lint:ignore RPC_REQUEST_STANDARD_NAME // buf:lint:ignore RPC_REQUEST_STANDARD_NAME
@ -111,6 +112,15 @@ func (c *peeringServiceClient) TrustBundleListByService(ctx context.Context, in
return out, nil return out, nil
} }
func (c *peeringServiceClient) TrustBundleRead(ctx context.Context, in *TrustBundleReadRequest, opts ...grpc.CallOption) (*TrustBundleReadResponse, error) {
out := new(TrustBundleReadResponse)
err := c.cc.Invoke(ctx, "/peering.PeeringService/TrustBundleRead", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *peeringServiceClient) StreamResources(ctx context.Context, opts ...grpc.CallOption) (PeeringService_StreamResourcesClient, error) { func (c *peeringServiceClient) StreamResources(ctx context.Context, opts ...grpc.CallOption) (PeeringService_StreamResourcesClient, error) {
stream, err := c.cc.NewStream(ctx, &PeeringService_ServiceDesc.Streams[0], "/peering.PeeringService/StreamResources", opts...) stream, err := c.cc.NewStream(ctx, &PeeringService_ServiceDesc.Streams[0], "/peering.PeeringService/StreamResources", opts...)
if err != nil { if err != nil {
@ -156,6 +166,7 @@ type PeeringServiceServer interface {
PeeringWrite(context.Context, *PeeringWriteRequest) (*PeeringWriteResponse, error) PeeringWrite(context.Context, *PeeringWriteRequest) (*PeeringWriteResponse, error)
// TODO(peering): Rename this to PeeredServiceRoots? or something like that? // TODO(peering): Rename this to PeeredServiceRoots? or something like that?
TrustBundleListByService(context.Context, *TrustBundleListByServiceRequest) (*TrustBundleListByServiceResponse, error) TrustBundleListByService(context.Context, *TrustBundleListByServiceRequest) (*TrustBundleListByServiceResponse, error)
TrustBundleRead(context.Context, *TrustBundleReadRequest) (*TrustBundleReadResponse, error)
// StreamResources opens an event stream for resources to share between peers, such as services. // StreamResources opens an event stream for resources to share between peers, such as services.
// Events are streamed as they happen. // Events are streamed as they happen.
// buf:lint:ignore RPC_REQUEST_STANDARD_NAME // buf:lint:ignore RPC_REQUEST_STANDARD_NAME
@ -189,6 +200,9 @@ func (UnimplementedPeeringServiceServer) PeeringWrite(context.Context, *PeeringW
func (UnimplementedPeeringServiceServer) TrustBundleListByService(context.Context, *TrustBundleListByServiceRequest) (*TrustBundleListByServiceResponse, error) { func (UnimplementedPeeringServiceServer) TrustBundleListByService(context.Context, *TrustBundleListByServiceRequest) (*TrustBundleListByServiceResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TrustBundleListByService not implemented") return nil, status.Errorf(codes.Unimplemented, "method TrustBundleListByService not implemented")
} }
func (UnimplementedPeeringServiceServer) TrustBundleRead(context.Context, *TrustBundleReadRequest) (*TrustBundleReadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TrustBundleRead not implemented")
}
func (UnimplementedPeeringServiceServer) StreamResources(PeeringService_StreamResourcesServer) error { func (UnimplementedPeeringServiceServer) StreamResources(PeeringService_StreamResourcesServer) error {
return status.Errorf(codes.Unimplemented, "method StreamResources not implemented") return status.Errorf(codes.Unimplemented, "method StreamResources not implemented")
} }
@ -330,6 +344,24 @@ func _PeeringService_TrustBundleListByService_Handler(srv interface{}, ctx conte
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _PeeringService_TrustBundleRead_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TrustBundleReadRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PeeringServiceServer).TrustBundleRead(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/peering.PeeringService/TrustBundleRead",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PeeringServiceServer).TrustBundleRead(ctx, req.(*TrustBundleReadRequest))
}
return interceptor(ctx, in, info, handler)
}
func _PeeringService_StreamResources_Handler(srv interface{}, stream grpc.ServerStream) error { func _PeeringService_StreamResources_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(PeeringServiceServer).StreamResources(&peeringServiceStreamResourcesServer{stream}) return srv.(PeeringServiceServer).StreamResources(&peeringServiceStreamResourcesServer{stream})
} }
@ -391,6 +423,10 @@ var PeeringService_ServiceDesc = grpc.ServiceDesc{
MethodName: "TrustBundleListByService", MethodName: "TrustBundleListByService",
Handler: _PeeringService_TrustBundleListByService_Handler, Handler: _PeeringService_TrustBundleListByService_Handler,
}, },
{
MethodName: "TrustBundleRead",
Handler: _PeeringService_TrustBundleRead_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {